1use crate::constants::{
7 DAEMON_AUTO_START_INITIAL_BACKOFF_MS, DAEMON_AUTO_START_MAX_BACKOFF_MS,
8 DAEMON_AUTO_START_MAX_WAIT_MS, DAEMON_IDLE_SHUTDOWN_SECS, DAEMON_PING_TIMEOUT_MS,
9 DAEMON_SPAWN_BACKOFF_BASE_MS, DAEMON_SPAWN_LOCK_WAIT_MS, DAEMON_VERSION_RESTART_WAIT_MS,
10 SQLITE_GRAPHRAG_VERSION,
11};
12use crate::errors::AppError;
13use crate::{embedder, shutdown_requested};
14use fs4::fs_std::FileExt;
15use interprocess::local_socket::{
16 prelude::LocalSocketStream,
17 traits::{Listener as _, Stream as _},
18 GenericFilePath, GenericNamespaced, ListenerNonblockingMode, ListenerOptions, ToFsName,
19 ToNsName,
20};
21use serde::{Deserialize, Serialize};
22use std::fs::{File, OpenOptions};
23use std::io::{BufRead, BufReader, Write};
24use std::path::{Path, PathBuf};
25use std::process::Stdio;
26use std::sync::atomic::{AtomicU64, AtomicU8, Ordering};
27use std::sync::Arc;
28use std::thread;
29use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
30
31const VERSION_NOT_CHECKED: u8 = 0;
32const VERSION_COMPATIBLE: u8 = 1;
33const VERSION_RESTART_ATTEMPTED: u8 = 2;
34
35static DAEMON_VERSION_STATE: AtomicU8 = AtomicU8::new(VERSION_NOT_CHECKED);
37
38#[derive(Debug, Serialize, Deserialize)]
39#[serde(tag = "request", rename_all = "snake_case")]
40pub enum DaemonRequest {
41 Ping,
42 Shutdown,
43 EmbedPassage {
44 text: String,
45 },
46 EmbedQuery {
47 text: String,
48 },
49 EmbedPassages {
50 texts: Vec<String>,
51 token_counts: Vec<usize>,
52 },
53}
54
55#[derive(Debug, Serialize, Deserialize)]
56#[serde(tag = "status", rename_all = "snake_case")]
57pub enum DaemonResponse {
58 Listening {
59 pid: u32,
60 socket: String,
61 idle_shutdown_secs: u64,
62 },
63 Ok {
64 pid: u32,
65 version: String,
66 handled_embed_requests: u64,
67 model_name: String,
68 model_variant: String,
69 },
70 PassageEmbedding {
71 embedding: Vec<f32>,
72 handled_embed_requests: u64,
73 },
74 QueryEmbedding {
75 embedding: Vec<f32>,
76 handled_embed_requests: u64,
77 },
78 PassageEmbeddings {
79 embeddings: Vec<Vec<f32>>,
80 handled_embed_requests: u64,
81 },
82 ShuttingDown {
83 handled_embed_requests: u64,
84 },
85 Error {
86 message: String,
87 },
88}
89
90#[derive(Debug, Default, Serialize, Deserialize)]
91struct DaemonSpawnState {
92 consecutive_failures: u32,
93 not_before_epoch_ms: u64,
94 last_error: Option<String>,
95}
96
97pub fn daemon_label(models_dir: &Path) -> String {
98 let hash = blake3::hash(models_dir.to_string_lossy().as_bytes())
99 .to_hex()
100 .to_string();
101 format!("sqlite-graphrag-daemon-{}", &hash[..16])
102}
103
104pub fn try_ping(models_dir: &Path) -> Result<Option<DaemonResponse>, AppError> {
105 request_if_available(models_dir, &DaemonRequest::Ping)
106}
107
108pub fn try_shutdown(models_dir: &Path) -> Result<Option<DaemonResponse>, AppError> {
109 request_if_available(models_dir, &DaemonRequest::Shutdown)
110}
111
112pub fn embed_passage_or_local(models_dir: &Path, text: &str) -> Result<Vec<f32>, AppError> {
113 match request_or_autostart(
114 models_dir,
115 &DaemonRequest::EmbedPassage {
116 text: text.to_string(),
117 },
118 true,
119 )? {
120 Some(DaemonResponse::PassageEmbedding { embedding, .. }) => Ok(embedding),
121 Some(DaemonResponse::Error { message }) => Err(AppError::Embedding(message)),
122 Some(other) => Err(AppError::Internal(anyhow::anyhow!(
123 "unexpected daemon response for passage embedding: {other:?}"
124 ))),
125 None => {
126 let embedder = embedder::get_embedder(models_dir)?;
127 embedder::embed_passage(embedder, text)
128 }
129 }
130}
131
132pub fn embed_query_or_local(
133 models_dir: &Path,
134 text: &str,
135 cli_autostart: bool,
136) -> Result<Vec<f32>, AppError> {
137 match request_or_autostart(
138 models_dir,
139 &DaemonRequest::EmbedQuery {
140 text: text.to_string(),
141 },
142 cli_autostart,
143 )? {
144 Some(DaemonResponse::QueryEmbedding { embedding, .. }) => Ok(embedding),
145 Some(DaemonResponse::Error { message }) => Err(AppError::Embedding(message)),
146 Some(other) => Err(AppError::Internal(anyhow::anyhow!(
147 "unexpected daemon response for query embedding: {other:?}"
148 ))),
149 None => {
150 let embedder = embedder::get_embedder(models_dir)?;
151 embedder::embed_query(embedder, text)
152 }
153 }
154}
155
156pub fn embed_passages_controlled_or_local(
157 models_dir: &Path,
158 texts: &[&str],
159 token_counts: &[usize],
160) -> Result<Vec<Vec<f32>>, AppError> {
161 let request = DaemonRequest::EmbedPassages {
162 texts: texts.iter().map(|t| (*t).to_string()).collect(),
163 token_counts: token_counts.to_vec(),
164 };
165
166 match request_or_autostart(models_dir, &request, true)? {
167 Some(DaemonResponse::PassageEmbeddings { embeddings, .. }) => Ok(embeddings),
168 Some(DaemonResponse::Error { message }) => Err(AppError::Embedding(message)),
169 Some(other) => Err(AppError::Internal(anyhow::anyhow!(
170 "unexpected daemon response for passage embedding batch: {other:?}"
171 ))),
172 None => {
173 let embedder = embedder::get_embedder(models_dir)?;
174 embedder::embed_passages_controlled(embedder, texts, token_counts)
175 }
176 }
177}
178
179struct DaemonSpawnGuard {
180 models_dir: PathBuf,
181}
182
183impl DaemonSpawnGuard {
184 fn new(models_dir: &Path) -> Self {
185 Self {
186 models_dir: models_dir.to_path_buf(),
187 }
188 }
189}
190
191impl Drop for DaemonSpawnGuard {
192 fn drop(&mut self) {
193 let lock_path = spawn_lock_path(&self.models_dir);
194 if lock_path.exists() {
195 match std::fs::remove_file(&lock_path) {
196 Ok(()) => {
197 tracing::debug!(
198 target: "daemon",
199 path = %lock_path.display(),
200 "spawn lock file removed during graceful daemon shutdown"
201 );
202 }
203 Err(err) if err.kind() == std::io::ErrorKind::NotFound => {}
204 Err(err) => {
205 tracing::warn!(
206 target: "daemon",
207 error = %err,
208 path = %lock_path.display(),
209 "failed to remove spawn lock file while shutting down daemon"
210 );
211 }
212 }
213 }
214 let pid_path = pid_file_path(&self.models_dir);
215 let _ = std::fs::remove_file(&pid_path);
216
217 tracing::info!(
218 target: "daemon",
219 "daemon shut down gracefully; socket will be cleaned up by OS or by the next daemon via try_overwrite"
220 );
221 }
222}
223
224pub fn run(
225 models_dir: &Path,
226 idle_shutdown_secs: u64,
227 shutdown_timeout_secs: u64,
228) -> Result<(), AppError> {
229 let permits = std::thread::available_parallelism()
232 .map(|n| n.get())
233 .unwrap_or(2)
234 .clamp(2, 8);
235 let rt = tokio::runtime::Builder::new_multi_thread()
236 .worker_threads(permits)
237 .thread_name("daemon-worker")
238 .enable_all()
239 .build()
240 .map_err(AppError::Io)?;
241
242 let result = rt.block_on(run_async(models_dir, idle_shutdown_secs, permits));
243 rt.shutdown_timeout(std::time::Duration::from_secs(shutdown_timeout_secs));
244 result
245}
246
247#[tracing::instrument(skip_all, fields(idle_secs = idle_shutdown_secs, permits))]
248async fn run_async(
249 models_dir: &Path,
250 idle_shutdown_secs: u64,
251 permits: usize,
252) -> Result<(), AppError> {
253 let socket = daemon_label(models_dir);
254 let name = to_local_socket_name(&socket)?;
255 let listener = ListenerOptions::new()
256 .name(name)
257 .nonblocking(ListenerNonblockingMode::Accept)
258 .try_overwrite(true)
259 .create_sync()
260 .map_err(AppError::Io)?;
261
262 let _spawn_guard = DaemonSpawnGuard::new(models_dir);
265
266 let models_dir_warm = models_dir.to_path_buf();
270 tokio::task::spawn_blocking(move || embedder::get_embedder(&models_dir_warm).map(|_| ()))
271 .await
272 .map_err(|e| AppError::Internal(anyhow::anyhow!("model warm-up panicked: {e}")))??;
273
274 let pid_path = pid_file_path(models_dir);
275 let _ = std::fs::write(&pid_path, std::process::id().to_string());
276
277 crate::output::emit_json(&DaemonResponse::Listening {
278 pid: std::process::id(),
279 socket,
280 idle_shutdown_secs,
281 })?;
282
283 let handled_embed_requests = Arc::new(AtomicU64::new(0));
284 let mut last_activity = Instant::now();
285 let models_dir = models_dir.to_path_buf();
286 let permit_pool = Arc::new(tokio::sync::Semaphore::new(permits));
288
289 let token = crate::cancel_token();
290 loop {
291 if shutdown_requested() || token.is_cancelled() {
292 break;
293 }
294
295 if !daemon_control_dir(&models_dir).exists() {
296 tracing::info!(target: "daemon", "daemon control directory disappeared; shutting down");
297 break;
298 }
299
300 match listener.accept() {
301 Ok(stream) => {
302 last_activity = Instant::now();
303 let models_dir_clone = models_dir.clone();
304 let counter = Arc::clone(&handled_embed_requests);
305 let permit =
306 permit_pool.clone().acquire_owned().await.map_err(|e| {
307 AppError::Internal(anyhow::anyhow!("semaphore closed: {e}"))
308 })?;
309 let should_exit = tokio::task::spawn_blocking(move || {
310 let _permit = permit; handle_client(stream, &models_dir_clone, &counter)
312 })
313 .await
314 .map_err(|e| {
315 AppError::Internal(anyhow::anyhow!("spawn_blocking panicked: {e}"))
316 })??;
317
318 if should_exit {
319 break;
320 }
321 }
322 Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
323 if last_activity.elapsed() >= Duration::from_secs(idle_shutdown_secs) {
324 tracing::info!(
325 target: "daemon",
326 idle_shutdown_secs,
327 handled_embed_requests = handled_embed_requests.load(Ordering::Relaxed),
328 "daemon idle timeout reached"
329 );
330 break;
331 }
332 tokio::select! {
333 () = tokio::time::sleep(Duration::from_millis(50)) => {}
334 () = token.cancelled() => { break; }
335 }
336 }
337 Err(err) => return Err(AppError::Io(err)),
338 }
339 }
340
341 Ok(())
342}
343
344fn handle_client(
345 stream: LocalSocketStream,
346 models_dir: &Path,
347 handled_embed_requests: &AtomicU64,
348) -> Result<bool, AppError> {
349 let mut reader = BufReader::new(stream);
350 let mut line = String::new();
351 reader.read_line(&mut line).map_err(AppError::Io)?;
352
353 if line.trim().is_empty() {
354 write_response(
355 reader.get_mut(),
356 &DaemonResponse::Error {
357 message: "empty request to daemon".to_string(),
358 },
359 )?;
360 return Ok(false);
361 }
362
363 let request: DaemonRequest = serde_json::from_str(line.trim()).map_err(AppError::Json)?;
364 let (response, should_exit) = match request {
365 DaemonRequest::Ping => (
366 DaemonResponse::Ok {
367 pid: std::process::id(),
368 version: SQLITE_GRAPHRAG_VERSION.to_string(),
369 handled_embed_requests: handled_embed_requests.load(Ordering::Relaxed),
370 model_name: crate::constants::FASTEMBED_MODEL_DEFAULT.to_string(),
371 model_variant: gliner_variant_from_env(),
372 },
373 false,
374 ),
375 DaemonRequest::Shutdown => (
376 DaemonResponse::ShuttingDown {
377 handled_embed_requests: handled_embed_requests.load(Ordering::Relaxed),
378 },
379 true,
380 ),
381 DaemonRequest::EmbedPassage { text } => {
382 let embedder = embedder::get_embedder(models_dir)?;
383 let embedding = embedder::embed_passage(embedder, &text)?;
384 let count = handled_embed_requests.fetch_add(1, Ordering::Relaxed) + 1;
385 (
386 DaemonResponse::PassageEmbedding {
387 embedding,
388 handled_embed_requests: count,
389 },
390 false,
391 )
392 }
393 DaemonRequest::EmbedQuery { text } => {
394 let embedder = embedder::get_embedder(models_dir)?;
395 let embedding = embedder::embed_query(embedder, &text)?;
396 let count = handled_embed_requests.fetch_add(1, Ordering::Relaxed) + 1;
397 (
398 DaemonResponse::QueryEmbedding {
399 embedding,
400 handled_embed_requests: count,
401 },
402 false,
403 )
404 }
405 DaemonRequest::EmbedPassages {
406 texts,
407 token_counts,
408 } => {
409 let embedder = embedder::get_embedder(models_dir)?;
410 let text_refs: Vec<&str> = texts.iter().map(String::as_str).collect();
411 let embeddings =
412 embedder::embed_passages_controlled(embedder, &text_refs, &token_counts)?;
413 let count = handled_embed_requests.fetch_add(1, Ordering::Relaxed) + 1;
414 (
415 DaemonResponse::PassageEmbeddings {
416 embeddings,
417 handled_embed_requests: count,
418 },
419 false,
420 )
421 }
422 };
423
424 write_response(reader.get_mut(), &response)?;
425 Ok(should_exit)
426}
427
428fn write_response(
429 stream: &mut LocalSocketStream,
430 response: &DaemonResponse,
431) -> Result<(), AppError> {
432 serde_json::to_writer(&mut *stream, response).map_err(AppError::Json)?;
433 stream.write_all(b"\n").map_err(AppError::Io)?;
434 stream.flush().map_err(AppError::Io)?;
435 Ok(())
436}
437
438fn request_if_available(
439 models_dir: &Path,
440 request: &DaemonRequest,
441) -> Result<Option<DaemonResponse>, AppError> {
442 let socket = daemon_label(models_dir);
443 let name = match to_local_socket_name(&socket) {
444 Ok(name) => name,
445 Err(err) => return Err(AppError::Io(err)),
446 };
447
448 let mut stream = match LocalSocketStream::connect(name) {
449 Ok(stream) => stream,
450 Err(err)
451 if matches!(
452 err.kind(),
453 std::io::ErrorKind::NotFound
454 | std::io::ErrorKind::ConnectionRefused
455 | std::io::ErrorKind::AddrNotAvailable
456 | std::io::ErrorKind::TimedOut
457 ) =>
458 {
459 return Ok(None);
460 }
461 Err(err) => return Err(AppError::Io(err)),
462 };
463
464 serde_json::to_writer(&mut stream, request).map_err(AppError::Json)?;
465 stream.write_all(b"\n").map_err(AppError::Io)?;
466 stream.flush().map_err(AppError::Io)?;
467
468 let mut reader = BufReader::new(stream);
469 let mut line = String::new();
470 reader.read_line(&mut line).map_err(AppError::Io)?;
471 if line.trim().is_empty() {
472 return Err(AppError::Embedding(
473 "daemon returned an empty response".into(),
474 ));
475 }
476
477 let response = serde_json::from_str(line.trim()).map_err(AppError::Json)?;
478 Ok(Some(response))
479}
480
481fn should_autostart(cli_flag: bool) -> bool {
482 if !cli_flag {
483 return false; }
485 !autostart_disabled_by_env()
486}
487
488fn maybe_restart_for_version_mismatch(models_dir: &Path) -> Result<(), AppError> {
493 if DAEMON_VERSION_STATE
496 .compare_exchange(
497 VERSION_NOT_CHECKED,
498 VERSION_COMPATIBLE,
499 Ordering::Acquire,
500 Ordering::Relaxed,
501 )
502 .is_err()
503 {
504 return Ok(());
506 }
507
508 let response = match try_ping(models_dir)? {
509 Some(r) => r,
510 None => return Ok(()), };
512
513 let daemon_version = match &response {
514 DaemonResponse::Ok { version, .. } => version.as_str(),
515 _ => return Ok(()), };
517
518 if daemon_version == SQLITE_GRAPHRAG_VERSION {
519 return Ok(()); }
521
522 DAEMON_VERSION_STATE.store(VERSION_RESTART_ATTEMPTED, Ordering::Release);
525
526 tracing::warn!(
527 target: "daemon",
528 daemon_version = %daemon_version,
529 cli_version = SQLITE_GRAPHRAG_VERSION,
530 "daemon version mismatch detected; auto-restarting daemon"
531 );
532
533 try_shutdown(models_dir)?;
535
536 wait_for_daemon_exit(models_dir)?;
538
539 ensure_daemon_running(models_dir)?;
541
542 Ok(())
543}
544
545#[cold]
549#[inline(never)]
550fn wait_for_daemon_exit(models_dir: &Path) -> Result<(), AppError> {
551 let deadline = Instant::now() + Duration::from_millis(DAEMON_VERSION_RESTART_WAIT_MS);
552 let mut sleep_ms: u64 = 50;
553
554 while Instant::now() < deadline {
555 if try_ping(models_dir)?.is_none() {
556 tracing::debug!(target: "daemon", "stale daemon exited after version-mismatch shutdown");
557 return Ok(());
558 }
559 thread::sleep(Duration::from_millis(sleep_ms));
560 sleep_ms = (sleep_ms * 2).min(500);
561 }
562
563 tracing::warn!(
564 target: "daemon",
565 timeout_ms = DAEMON_VERSION_RESTART_WAIT_MS,
566 "timed out waiting for stale daemon to exit after version-mismatch shutdown"
567 );
568 Ok(())
569}
570
571fn request_or_autostart(
572 models_dir: &Path,
573 request: &DaemonRequest,
574 cli_autostart: bool,
575) -> Result<Option<DaemonResponse>, AppError> {
576 if DAEMON_VERSION_STATE.load(Ordering::Acquire) == VERSION_NOT_CHECKED {
578 maybe_restart_for_version_mismatch(models_dir)?;
579 }
580
581 if let Some(response) = request_if_available(models_dir, request)? {
582 clear_spawn_backoff_state(models_dir).ok();
583 return Ok(Some(response));
584 }
585
586 if !should_autostart(cli_autostart) {
587 return Ok(None);
588 }
589
590 if !ensure_daemon_running(models_dir)? {
591 return Ok(None);
592 }
593
594 request_if_available(models_dir, request)
595}
596
597fn ensure_daemon_running(models_dir: &Path) -> Result<bool, AppError> {
598 if (try_ping(models_dir)?).is_some() {
599 clear_spawn_backoff_state(models_dir).ok();
600 return Ok(true);
601 }
602
603 if spawn_backoff_active(models_dir)? {
604 tracing::warn!(target: "daemon", "daemon autostart suppressed by backoff window");
605 return Ok(false);
606 }
607
608 let spawn_lock = match try_acquire_spawn_lock(models_dir)? {
609 Some(lock) => lock,
610 None => return wait_for_daemon_ready(models_dir),
611 };
612
613 if (try_ping(models_dir)?).is_some() {
614 clear_spawn_backoff_state(models_dir).ok();
615 drop(spawn_lock);
616 return Ok(true);
617 }
618
619 let exe = match std::env::current_exe() {
620 Ok(path) => path,
621 Err(err) => {
622 record_spawn_failure(models_dir, &format!("current_exe failed: {err}"))?;
623 drop(spawn_lock);
624 return Ok(false);
625 }
626 };
627
628 let mut child = std::process::Command::new(exe);
629 child
630 .arg("daemon")
631 .arg("--idle-shutdown-secs")
632 .arg(DAEMON_IDLE_SHUTDOWN_SECS.to_string())
633 .env("SQLITE_GRAPHRAG_DAEMON_CHILD", "1")
634 .env_remove("LD_PRELOAD")
635 .env_remove("LD_LIBRARY_PATH")
636 .env_remove("LD_AUDIT")
637 .env_remove("DYLD_INSERT_LIBRARIES")
638 .env_remove("DYLD_LIBRARY_PATH")
639 .stdin(Stdio::null())
640 .stdout(Stdio::null())
641 .stderr(Stdio::null());
642
643 match crate::commands::claude_runner::spawn_with_memory_limit(&mut child) {
644 Ok(child_handle) => {
645 let pid = child_handle.id();
657 drop(child_handle);
658 tracing::debug!(
659 target: "daemon",
660 pid,
661 "daemon detached; lifecycle managed via spawn lock + readiness file"
662 );
663 let ready = wait_for_daemon_ready(models_dir)?;
664 if ready {
665 clear_spawn_backoff_state(models_dir).ok();
666 } else {
667 record_spawn_failure(models_dir, "daemon did not become healthy after autostart")?;
668 }
669 drop(spawn_lock);
670 Ok(ready)
671 }
672 Err(err) => {
673 record_spawn_failure(models_dir, &format!("daemon spawn failed: {err}"))?;
674 drop(spawn_lock);
675 Ok(false)
676 }
677 }
678}
679
680fn wait_for_daemon_ready(models_dir: &Path) -> Result<bool, AppError> {
681 let deadline = Instant::now() + Duration::from_millis(DAEMON_AUTO_START_MAX_WAIT_MS);
682 let mut sleep_ms = DAEMON_AUTO_START_INITIAL_BACKOFF_MS.max(DAEMON_PING_TIMEOUT_MS);
683
684 while Instant::now() < deadline {
685 if (try_ping(models_dir)?).is_some() {
686 return Ok(true);
687 }
688 thread::sleep(Duration::from_millis(sleep_ms));
689 sleep_ms = (sleep_ms * 2).min(DAEMON_AUTO_START_MAX_BACKOFF_MS);
690 }
691
692 Ok(false)
693}
694
695fn autostart_disabled_by_env() -> bool {
696 std::env::var("SQLITE_GRAPHRAG_DAEMON_CHILD").as_deref() == Ok("1")
697 || std::env::var("SQLITE_GRAPHRAG_DAEMON_FORCE_AUTOSTART").as_deref() != Ok("1")
698 && std::env::var("SQLITE_GRAPHRAG_DAEMON_DISABLE_AUTOSTART").as_deref() == Ok("1")
699}
700
701fn daemon_control_dir(models_dir: &Path) -> PathBuf {
702 models_dir
703 .parent()
704 .map(Path::to_path_buf)
705 .unwrap_or_else(|| models_dir.to_path_buf())
706}
707
708fn spawn_lock_path(models_dir: &Path) -> PathBuf {
709 daemon_control_dir(models_dir).join("daemon-spawn.lock")
710}
711
712fn spawn_state_path(models_dir: &Path) -> PathBuf {
713 daemon_control_dir(models_dir).join("daemon-spawn-state.json")
714}
715
716fn pid_file_path(models_dir: &Path) -> PathBuf {
717 daemon_control_dir(models_dir).join("daemon.pid")
718}
719
720fn try_acquire_spawn_lock(models_dir: &Path) -> Result<Option<File>, AppError> {
721 let path = spawn_lock_path(models_dir);
722 std::fs::create_dir_all(crate::paths::parent_or_err(&path)?).map_err(AppError::Io)?;
723 let file = OpenOptions::new()
724 .read(true)
725 .write(true)
726 .create(true)
727 .truncate(false)
728 .open(path)
729 .map_err(AppError::Io)?;
730
731 let deadline = Instant::now() + Duration::from_millis(DAEMON_SPAWN_LOCK_WAIT_MS);
732 loop {
733 match file.try_lock_exclusive() {
734 Ok(()) => return Ok(Some(file)),
735 Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
736 if Instant::now() >= deadline {
737 return Ok(None);
738 }
739 thread::sleep(Duration::from_millis(50));
740 }
741 Err(err) => return Err(AppError::Io(err)),
742 }
743 }
744}
745
746fn spawn_backoff_active(models_dir: &Path) -> Result<bool, AppError> {
747 let state = load_spawn_state(models_dir)?;
748 Ok(now_epoch_ms() < state.not_before_epoch_ms)
749}
750
751#[cold]
752#[inline(never)]
753fn record_spawn_failure(models_dir: &Path, message: &str) -> Result<(), AppError> {
754 let mut state = load_spawn_state(models_dir)?;
755 state.consecutive_failures = state.consecutive_failures.saturating_add(1);
756 let exponent = state.consecutive_failures.saturating_sub(1).min(6);
757 let base_ms =
758 (DAEMON_SPAWN_BACKOFF_BASE_MS * (1_u64 << exponent)).min(DAEMON_AUTO_START_MAX_BACKOFF_MS);
759 let half = base_ms / 2;
762 let jitter = if half == 0 { 0 } else { fastrand::u64(0..half) };
763 let backoff_ms = half + jitter;
764 state.not_before_epoch_ms = now_epoch_ms() + backoff_ms;
765 state.last_error = Some(message.to_string());
766 save_spawn_state(models_dir, &state)
767}
768
769fn clear_spawn_backoff_state(models_dir: &Path) -> Result<(), AppError> {
770 let path = spawn_state_path(models_dir);
771 if path.exists() {
772 std::fs::remove_file(path).map_err(AppError::Io)?;
773 }
774 Ok(())
775}
776
777fn load_spawn_state(models_dir: &Path) -> Result<DaemonSpawnState, AppError> {
778 let path = spawn_state_path(models_dir);
779 if !path.exists() {
780 return Ok(DaemonSpawnState::default());
781 }
782
783 let bytes = std::fs::read(path).map_err(AppError::Io)?;
784 serde_json::from_slice(&bytes).map_err(AppError::Json)
785}
786
787fn save_spawn_state(models_dir: &Path, state: &DaemonSpawnState) -> Result<(), AppError> {
788 let path = spawn_state_path(models_dir);
789 std::fs::create_dir_all(crate::paths::parent_or_err(&path)?).map_err(AppError::Io)?;
790 let bytes = serde_json::to_vec(state).map_err(AppError::Json)?;
791 std::fs::write(path, bytes).map_err(AppError::Io)
792}
793
794fn gliner_variant_from_env() -> String {
797 std::env::var("SQLITE_GRAPHRAG_GLINER_VARIANT").unwrap_or_else(|_| "fp32".to_string())
798}
799
800fn now_epoch_ms() -> u64 {
801 SystemTime::now()
802 .duration_since(UNIX_EPOCH)
803 .unwrap_or_else(|_| Duration::from_secs(0))
804 .as_millis() as u64
805}
806
807fn to_local_socket_name(name: &str) -> std::io::Result<interprocess::local_socket::Name<'static>> {
808 if let Ok(ns_name) = name.to_string().to_ns_name::<GenericNamespaced>() {
809 return Ok(ns_name);
810 }
811
812 let path = if cfg!(unix) {
817 let base = std::env::var_os("XDG_RUNTIME_DIR")
818 .or_else(|| std::env::var_os("SQLITE_GRAPHRAG_HOME"))
819 .map(std::path::PathBuf::from)
820 .unwrap_or_else(std::env::temp_dir);
821 base.join(format!("{name}.sock"))
822 .to_string_lossy()
823 .into_owned()
824 } else {
825 format!(r"\\.\pipe\{name}")
826 };
827 path.to_fs_name::<GenericFilePath>()
828}
829
830#[cfg(test)]
831mod tests {
832 use super::*;
833
834 #[test]
835 fn record_and_clear_spawn_backoff_state() {
836 let tmp = tempfile::tempdir().unwrap();
837 let models_dir = tmp.path().join("cache").join("models");
838 std::fs::create_dir_all(&models_dir).unwrap();
839
840 assert!(!spawn_backoff_active(&models_dir).unwrap());
841
842 record_spawn_failure(&models_dir, "spawn failed").unwrap();
843 assert!(spawn_backoff_active(&models_dir).unwrap());
844
845 let state = load_spawn_state(&models_dir).unwrap();
846 assert_eq!(state.consecutive_failures, 1);
847 assert_eq!(state.last_error.as_deref(), Some("spawn failed"));
848
849 clear_spawn_backoff_state(&models_dir).unwrap();
850 assert!(!spawn_backoff_active(&models_dir).unwrap());
851 }
852
853 #[test]
854 fn daemon_control_dir_uses_models_parent() {
855 let base = PathBuf::from("/tmp/sqlite-graphrag-cache-test");
856 let models_dir = base.join("models");
857 assert_eq!(daemon_control_dir(&models_dir), base);
858 }
859
860 #[test]
861 fn version_state_constants_are_distinct() {
862 assert_ne!(VERSION_NOT_CHECKED, VERSION_COMPATIBLE);
863 assert_ne!(VERSION_NOT_CHECKED, VERSION_RESTART_ATTEMPTED);
864 assert_ne!(VERSION_COMPATIBLE, VERSION_RESTART_ATTEMPTED);
865 }
866
867 #[test]
868 fn wait_for_daemon_exit_immediate_when_not_running() {
869 let tmp = tempfile::tempdir().unwrap();
870 let models_dir = tmp.path().join("cache").join("models");
871 std::fs::create_dir_all(&models_dir).unwrap();
872
873 let start = Instant::now();
874 wait_for_daemon_exit(&models_dir).unwrap();
875 assert!(start.elapsed() < Duration::from_millis(500));
877 }
878
879 #[test]
880 fn spawn_backoff_exponent_caps_at_six() {
881 let tmp = tempfile::tempdir().unwrap();
882 let models_dir = tmp.path().join("cache").join("models");
883 std::fs::create_dir_all(&models_dir).unwrap();
884
885 for i in 0..10 {
887 record_spawn_failure(&models_dir, &format!("failure {i}")).unwrap();
888 }
889
890 let state = load_spawn_state(&models_dir).unwrap();
891 assert_eq!(state.consecutive_failures, 10);
892
893 let max_base =
896 (DAEMON_SPAWN_BACKOFF_BASE_MS * (1_u64 << 6)).min(DAEMON_AUTO_START_MAX_BACKOFF_MS);
897 let now = now_epoch_ms();
899 assert!(state.not_before_epoch_ms <= now + max_base);
900 }
901
902 #[test]
903 fn spawn_backoff_half_jitter_in_range() {
904 let base_ms: u64 = 100;
907 let half = base_ms / 2;
908 for _ in 0..100 {
909 let jitter = fastrand::u64(0..half);
910 let result = half + jitter;
911 assert!(result >= half, "result {result} below half {half}");
912 assert!(result < base_ms, "result {result} not below base {base_ms}");
913 }
914 }
915
916 #[test]
917 fn to_local_socket_name_produces_valid_result() {
918 let result = to_local_socket_name("sqlite-graphrag-test-daemon");
919 assert!(result.is_ok(), "expected Ok, got {result:?}");
920 let name = result.unwrap();
922 let display = format!("{name:?}");
923 assert!(!display.is_empty());
924 }
925
926 #[test]
927 fn version_cas_not_checked_to_compatible() {
928 let state = AtomicU8::new(VERSION_NOT_CHECKED);
929 let result = state.compare_exchange(
930 VERSION_NOT_CHECKED,
931 VERSION_COMPATIBLE,
932 Ordering::SeqCst,
933 Ordering::SeqCst,
934 );
935 assert!(result.is_ok());
936 assert_eq!(state.load(Ordering::SeqCst), VERSION_COMPATIBLE);
937 }
938
939 #[test]
940 fn version_cas_prevents_double_restart() {
941 let state = AtomicU8::new(VERSION_NOT_CHECKED);
942
943 let first = state.compare_exchange(
945 VERSION_NOT_CHECKED,
946 VERSION_RESTART_ATTEMPTED,
947 Ordering::SeqCst,
948 Ordering::SeqCst,
949 );
950 assert!(first.is_ok());
951
952 let second = state.compare_exchange(
954 VERSION_NOT_CHECKED,
955 VERSION_RESTART_ATTEMPTED,
956 Ordering::SeqCst,
957 Ordering::SeqCst,
958 );
959 assert!(second.is_err());
960 assert_eq!(state.load(Ordering::SeqCst), VERSION_RESTART_ATTEMPTED);
961 }
962
963 #[test]
964 fn ping_response_includes_model_fields() {
965 let resp = DaemonResponse::Ok {
966 pid: 42,
967 version: "1.0.0".to_string(),
968 handled_embed_requests: 7,
969 model_name: "multilingual-e5-small".to_string(),
970 model_variant: "fp32".to_string(),
971 };
972 let json = serde_json::to_value(&resp).expect("serialization failed");
973 assert_eq!(json["model_name"], "multilingual-e5-small");
974 assert_eq!(json["model_variant"], "fp32");
975 assert_eq!(json["status"], "ok");
976 assert_eq!(json["handled_embed_requests"], 7u64);
977 }
978
979 #[test]
980 fn gliner_variant_defaults_to_fp32() {
981 std::env::remove_var("SQLITE_GRAPHRAG_GLINER_VARIANT");
983 let variant = gliner_variant_from_env();
984 assert_eq!(variant, "fp32");
985 }
986
987 #[test]
988 fn gliner_variant_reads_env_var() {
989 std::env::set_var("SQLITE_GRAPHRAG_GLINER_VARIANT", "int8");
990 let variant = gliner_variant_from_env();
991 std::env::remove_var("SQLITE_GRAPHRAG_GLINER_VARIANT");
992 assert_eq!(variant, "int8");
993 }
994
995 #[test]
996 fn spawn_state_serialization_roundtrip() {
997 let tmp = tempfile::tempdir().unwrap();
998 let models_dir = tmp.path().join("cache").join("models");
999 std::fs::create_dir_all(&models_dir).unwrap();
1000
1001 let original = DaemonSpawnState {
1002 consecutive_failures: 3,
1003 not_before_epoch_ms: 9_999_999_999,
1004 last_error: Some("test error message".to_string()),
1005 };
1006 save_spawn_state(&models_dir, &original).unwrap();
1007
1008 let loaded = load_spawn_state(&models_dir).unwrap();
1009 assert_eq!(loaded.consecutive_failures, original.consecutive_failures);
1010 assert_eq!(loaded.not_before_epoch_ms, original.not_before_epoch_ms);
1011 assert_eq!(loaded.last_error, original.last_error);
1012 }
1013}