1use crate::constants::{
7 DAEMON_AUTO_START_INITIAL_BACKOFF_MS, DAEMON_AUTO_START_MAX_BACKOFF_MS,
8 DAEMON_AUTO_START_MAX_WAIT_MS, DAEMON_IDLE_SHUTDOWN_SECS, DAEMON_PING_TIMEOUT_MS,
9 DAEMON_SPAWN_BACKOFF_BASE_MS, DAEMON_SPAWN_LOCK_WAIT_MS, DAEMON_VERSION_RESTART_WAIT_MS,
10 SQLITE_GRAPHRAG_VERSION,
11};
12use crate::errors::AppError;
13use crate::{embedder, shutdown_requested};
14use fs4::fs_std::FileExt;
15use interprocess::local_socket::{
16 prelude::LocalSocketStream,
17 traits::{Listener as _, Stream as _},
18 GenericFilePath, GenericNamespaced, ListenerNonblockingMode, ListenerOptions, ToFsName,
19 ToNsName,
20};
21use serde::{Deserialize, Serialize};
22use std::fs::{File, OpenOptions};
23use std::io::{BufRead, BufReader, Write};
24use std::path::{Path, PathBuf};
25use std::process::Stdio;
26use std::sync::atomic::{AtomicU64, AtomicU8, Ordering};
27use std::sync::Arc;
28use std::thread;
29use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
30
31const VERSION_NOT_CHECKED: u8 = 0;
32const VERSION_COMPATIBLE: u8 = 1;
33const VERSION_RESTART_ATTEMPTED: u8 = 2;
34
35static DAEMON_VERSION_STATE: AtomicU8 = AtomicU8::new(VERSION_NOT_CHECKED);
37
38#[derive(Debug, Serialize, Deserialize)]
39#[serde(tag = "request", rename_all = "snake_case")]
40pub enum DaemonRequest {
41 Ping,
42 Shutdown,
43 EmbedPassage {
44 text: String,
45 },
46 EmbedQuery {
47 text: String,
48 },
49 EmbedPassages {
50 texts: Vec<String>,
51 token_counts: Vec<usize>,
52 },
53}
54
55#[derive(Debug, Serialize, Deserialize)]
56#[serde(tag = "status", rename_all = "snake_case")]
57pub enum DaemonResponse {
58 Listening {
59 pid: u32,
60 socket: String,
61 idle_shutdown_secs: u64,
62 },
63 Ok {
64 pid: u32,
65 version: String,
66 handled_embed_requests: u64,
67 model_name: String,
68 model_variant: String,
69 },
70 PassageEmbedding {
71 embedding: Vec<f32>,
72 handled_embed_requests: u64,
73 },
74 QueryEmbedding {
75 embedding: Vec<f32>,
76 handled_embed_requests: u64,
77 },
78 PassageEmbeddings {
79 embeddings: Vec<Vec<f32>>,
80 handled_embed_requests: u64,
81 },
82 ShuttingDown {
83 handled_embed_requests: u64,
84 },
85 Error {
86 message: String,
87 },
88}
89
90#[derive(Debug, Default, Serialize, Deserialize)]
91struct DaemonSpawnState {
92 consecutive_failures: u32,
93 not_before_epoch_ms: u64,
94 last_error: Option<String>,
95}
96
97pub fn daemon_label(models_dir: &Path) -> String {
98 let hash = blake3::hash(models_dir.to_string_lossy().as_bytes())
99 .to_hex()
100 .to_string();
101 format!("sqlite-graphrag-daemon-{}", &hash[..16])
102}
103
104pub fn try_ping(models_dir: &Path) -> Result<Option<DaemonResponse>, AppError> {
105 request_if_available(models_dir, &DaemonRequest::Ping)
106}
107
108pub fn try_shutdown(models_dir: &Path) -> Result<Option<DaemonResponse>, AppError> {
109 request_if_available(models_dir, &DaemonRequest::Shutdown)
110}
111
112pub fn embed_passage_or_local(models_dir: &Path, text: &str) -> Result<Vec<f32>, AppError> {
113 match request_or_autostart(
114 models_dir,
115 &DaemonRequest::EmbedPassage {
116 text: text.to_string(),
117 },
118 true,
119 )? {
120 Some(DaemonResponse::PassageEmbedding { embedding, .. }) => Ok(embedding),
121 Some(DaemonResponse::Error { message }) => Err(AppError::Embedding(message)),
122 Some(other) => Err(AppError::Internal(anyhow::anyhow!(
123 "unexpected daemon response for passage embedding: {other:?}"
124 ))),
125 None => {
126 let embedder = embedder::get_embedder(models_dir)?;
127 embedder::embed_passage(embedder, text)
128 }
129 }
130}
131
132pub fn embed_query_or_local(
133 models_dir: &Path,
134 text: &str,
135 cli_autostart: bool,
136) -> Result<Vec<f32>, AppError> {
137 match request_or_autostart(
138 models_dir,
139 &DaemonRequest::EmbedQuery {
140 text: text.to_string(),
141 },
142 cli_autostart,
143 )? {
144 Some(DaemonResponse::QueryEmbedding { embedding, .. }) => Ok(embedding),
145 Some(DaemonResponse::Error { message }) => Err(AppError::Embedding(message)),
146 Some(other) => Err(AppError::Internal(anyhow::anyhow!(
147 "unexpected daemon response for query embedding: {other:?}"
148 ))),
149 None => {
150 let embedder = embedder::get_embedder(models_dir)?;
151 embedder::embed_query(embedder, text)
152 }
153 }
154}
155
156pub fn embed_passages_controlled_or_local(
157 models_dir: &Path,
158 texts: &[&str],
159 token_counts: &[usize],
160) -> Result<Vec<Vec<f32>>, AppError> {
161 let request = DaemonRequest::EmbedPassages {
162 texts: texts.iter().map(|t| (*t).to_string()).collect(),
163 token_counts: token_counts.to_vec(),
164 };
165
166 match request_or_autostart(models_dir, &request, true)? {
167 Some(DaemonResponse::PassageEmbeddings { embeddings, .. }) => Ok(embeddings),
168 Some(DaemonResponse::Error { message }) => Err(AppError::Embedding(message)),
169 Some(other) => Err(AppError::Internal(anyhow::anyhow!(
170 "unexpected daemon response for passage embedding batch: {other:?}"
171 ))),
172 None => {
173 let embedder = embedder::get_embedder(models_dir)?;
174 embedder::embed_passages_controlled(embedder, texts, token_counts)
175 }
176 }
177}
178
179struct DaemonSpawnGuard {
180 models_dir: PathBuf,
181}
182
183impl DaemonSpawnGuard {
184 fn new(models_dir: &Path) -> Self {
185 Self {
186 models_dir: models_dir.to_path_buf(),
187 }
188 }
189}
190
191impl Drop for DaemonSpawnGuard {
192 fn drop(&mut self) {
193 let lock_path = spawn_lock_path(&self.models_dir);
194 if lock_path.exists() {
195 match std::fs::remove_file(&lock_path) {
196 Ok(()) => {
197 tracing::debug!(
198 path = %lock_path.display(),
199 "spawn lock file removed during graceful daemon shutdown"
200 );
201 }
202 Err(err) if err.kind() == std::io::ErrorKind::NotFound => {}
203 Err(err) => {
204 tracing::warn!(
205 error = %err,
206 path = %lock_path.display(),
207 "failed to remove spawn lock file while shutting down daemon"
208 );
209 }
210 }
211 }
212 tracing::info!(
213 "daemon shut down gracefully; socket will be cleaned up by OS or by the next daemon via try_overwrite"
214 );
215 }
216}
217
218pub fn run(models_dir: &Path, idle_shutdown_secs: u64) -> Result<(), AppError> {
219 let permits = std::thread::available_parallelism()
222 .map(|n| n.get())
223 .unwrap_or(2)
224 .clamp(2, 8);
225 let rt = tokio::runtime::Builder::new_multi_thread()
226 .worker_threads(permits)
227 .thread_name("daemon-worker")
228 .enable_all()
229 .build()
230 .map_err(AppError::Io)?;
231
232 rt.block_on(run_async(models_dir, idle_shutdown_secs, permits))
233}
234
235async fn run_async(
236 models_dir: &Path,
237 idle_shutdown_secs: u64,
238 permits: usize,
239) -> Result<(), AppError> {
240 let socket = daemon_label(models_dir);
241 let name = to_local_socket_name(&socket)?;
242 let listener = ListenerOptions::new()
243 .name(name)
244 .nonblocking(ListenerNonblockingMode::Accept)
245 .try_overwrite(true)
246 .create_sync()
247 .map_err(AppError::Io)?;
248
249 let _spawn_guard = DaemonSpawnGuard::new(models_dir);
252
253 let models_dir_warm = models_dir.to_path_buf();
257 tokio::task::spawn_blocking(move || embedder::get_embedder(&models_dir_warm).map(|_| ()))
258 .await
259 .map_err(|e| AppError::Internal(anyhow::anyhow!("model warm-up panicked: {e}")))??;
260
261 crate::output::emit_json(&DaemonResponse::Listening {
262 pid: std::process::id(),
263 socket,
264 idle_shutdown_secs,
265 })?;
266
267 let handled_embed_requests = Arc::new(AtomicU64::new(0));
268 let mut last_activity = Instant::now();
269 let models_dir = models_dir.to_path_buf();
270 let permit_pool = Arc::new(tokio::sync::Semaphore::new(permits));
272
273 loop {
274 if shutdown_requested() {
275 break;
276 }
277
278 if !daemon_control_dir(&models_dir).exists() {
279 tracing::info!("daemon control directory disappeared; shutting down");
280 break;
281 }
282
283 match listener.accept() {
284 Ok(stream) => {
285 last_activity = Instant::now();
286 let models_dir_clone = models_dir.clone();
287 let counter = Arc::clone(&handled_embed_requests);
288 let permit =
289 permit_pool.clone().acquire_owned().await.map_err(|e| {
290 AppError::Internal(anyhow::anyhow!("semaphore closed: {e}"))
291 })?;
292 let should_exit = tokio::task::spawn_blocking(move || {
293 let _permit = permit; handle_client(stream, &models_dir_clone, &counter)
295 })
296 .await
297 .map_err(|e| {
298 AppError::Internal(anyhow::anyhow!("spawn_blocking panicked: {e}"))
299 })??;
300
301 if should_exit {
302 break;
303 }
304 }
305 Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
306 if last_activity.elapsed() >= Duration::from_secs(idle_shutdown_secs) {
307 tracing::info!(
308 idle_shutdown_secs,
309 handled_embed_requests = handled_embed_requests.load(Ordering::Relaxed),
310 "daemon idle timeout reached"
311 );
312 break;
313 }
314 tokio::time::sleep(Duration::from_millis(50)).await;
315 }
316 Err(err) => return Err(AppError::Io(err)),
317 }
318 }
319
320 Ok(())
321}
322
323fn handle_client(
324 stream: LocalSocketStream,
325 models_dir: &Path,
326 handled_embed_requests: &AtomicU64,
327) -> Result<bool, AppError> {
328 let mut reader = BufReader::new(stream);
329 let mut line = String::new();
330 reader.read_line(&mut line).map_err(AppError::Io)?;
331
332 if line.trim().is_empty() {
333 write_response(
334 reader.get_mut(),
335 &DaemonResponse::Error {
336 message: "empty request to daemon".to_string(),
337 },
338 )?;
339 return Ok(false);
340 }
341
342 let request: DaemonRequest = serde_json::from_str(line.trim()).map_err(AppError::Json)?;
343 let (response, should_exit) = match request {
344 DaemonRequest::Ping => (
345 DaemonResponse::Ok {
346 pid: std::process::id(),
347 version: SQLITE_GRAPHRAG_VERSION.to_string(),
348 handled_embed_requests: handled_embed_requests.load(Ordering::Relaxed),
349 model_name: crate::constants::FASTEMBED_MODEL_DEFAULT.to_string(),
350 model_variant: gliner_variant_from_env(),
351 },
352 false,
353 ),
354 DaemonRequest::Shutdown => (
355 DaemonResponse::ShuttingDown {
356 handled_embed_requests: handled_embed_requests.load(Ordering::Relaxed),
357 },
358 true,
359 ),
360 DaemonRequest::EmbedPassage { text } => {
361 let embedder = embedder::get_embedder(models_dir)?;
362 let embedding = embedder::embed_passage(embedder, &text)?;
363 let count = handled_embed_requests.fetch_add(1, Ordering::Relaxed) + 1;
364 (
365 DaemonResponse::PassageEmbedding {
366 embedding,
367 handled_embed_requests: count,
368 },
369 false,
370 )
371 }
372 DaemonRequest::EmbedQuery { text } => {
373 let embedder = embedder::get_embedder(models_dir)?;
374 let embedding = embedder::embed_query(embedder, &text)?;
375 let count = handled_embed_requests.fetch_add(1, Ordering::Relaxed) + 1;
376 (
377 DaemonResponse::QueryEmbedding {
378 embedding,
379 handled_embed_requests: count,
380 },
381 false,
382 )
383 }
384 DaemonRequest::EmbedPassages {
385 texts,
386 token_counts,
387 } => {
388 let embedder = embedder::get_embedder(models_dir)?;
389 let text_refs: Vec<&str> = texts.iter().map(String::as_str).collect();
390 let embeddings =
391 embedder::embed_passages_controlled(embedder, &text_refs, &token_counts)?;
392 let count = handled_embed_requests.fetch_add(1, Ordering::Relaxed) + 1;
393 (
394 DaemonResponse::PassageEmbeddings {
395 embeddings,
396 handled_embed_requests: count,
397 },
398 false,
399 )
400 }
401 };
402
403 write_response(reader.get_mut(), &response)?;
404 Ok(should_exit)
405}
406
407fn write_response(
408 stream: &mut LocalSocketStream,
409 response: &DaemonResponse,
410) -> Result<(), AppError> {
411 serde_json::to_writer(&mut *stream, response).map_err(AppError::Json)?;
412 stream.write_all(b"\n").map_err(AppError::Io)?;
413 stream.flush().map_err(AppError::Io)?;
414 Ok(())
415}
416
417fn request_if_available(
418 models_dir: &Path,
419 request: &DaemonRequest,
420) -> Result<Option<DaemonResponse>, AppError> {
421 let socket = daemon_label(models_dir);
422 let name = match to_local_socket_name(&socket) {
423 Ok(name) => name,
424 Err(err) => return Err(AppError::Io(err)),
425 };
426
427 let mut stream = match LocalSocketStream::connect(name) {
428 Ok(stream) => stream,
429 Err(err)
430 if matches!(
431 err.kind(),
432 std::io::ErrorKind::NotFound
433 | std::io::ErrorKind::ConnectionRefused
434 | std::io::ErrorKind::AddrNotAvailable
435 | std::io::ErrorKind::TimedOut
436 ) =>
437 {
438 return Ok(None);
439 }
440 Err(err) => return Err(AppError::Io(err)),
441 };
442
443 serde_json::to_writer(&mut stream, request).map_err(AppError::Json)?;
444 stream.write_all(b"\n").map_err(AppError::Io)?;
445 stream.flush().map_err(AppError::Io)?;
446
447 let mut reader = BufReader::new(stream);
448 let mut line = String::new();
449 reader.read_line(&mut line).map_err(AppError::Io)?;
450 if line.trim().is_empty() {
451 return Err(AppError::Embedding(
452 "daemon returned an empty response".into(),
453 ));
454 }
455
456 let response = serde_json::from_str(line.trim()).map_err(AppError::Json)?;
457 Ok(Some(response))
458}
459
460fn should_autostart(cli_flag: bool) -> bool {
461 if !cli_flag {
462 return false; }
464 !autostart_disabled_by_env()
465}
466
467fn maybe_restart_for_version_mismatch(models_dir: &Path) -> Result<(), AppError> {
472 if DAEMON_VERSION_STATE
473 .compare_exchange(
474 VERSION_NOT_CHECKED,
475 VERSION_COMPATIBLE,
476 Ordering::SeqCst,
477 Ordering::SeqCst,
478 )
479 .is_err()
480 {
481 return Ok(());
483 }
484
485 let response = match try_ping(models_dir)? {
486 Some(r) => r,
487 None => return Ok(()), };
489
490 let daemon_version = match &response {
491 DaemonResponse::Ok { version, .. } => version.as_str(),
492 _ => return Ok(()), };
494
495 if daemon_version == SQLITE_GRAPHRAG_VERSION {
496 return Ok(()); }
498
499 DAEMON_VERSION_STATE.store(VERSION_RESTART_ATTEMPTED, Ordering::SeqCst);
501
502 tracing::warn!(
503 daemon_version = %daemon_version,
504 cli_version = SQLITE_GRAPHRAG_VERSION,
505 "daemon version mismatch detected; auto-restarting daemon"
506 );
507
508 try_shutdown(models_dir)?;
510
511 wait_for_daemon_exit(models_dir)?;
513
514 ensure_daemon_running(models_dir)?;
516
517 Ok(())
518}
519
520fn wait_for_daemon_exit(models_dir: &Path) -> Result<(), AppError> {
524 let deadline = Instant::now() + Duration::from_millis(DAEMON_VERSION_RESTART_WAIT_MS);
525 let mut sleep_ms: u64 = 50;
526
527 while Instant::now() < deadline {
528 if try_ping(models_dir)?.is_none() {
529 tracing::debug!("stale daemon exited after version-mismatch shutdown");
530 return Ok(());
531 }
532 thread::sleep(Duration::from_millis(sleep_ms));
533 sleep_ms = (sleep_ms * 2).min(500);
534 }
535
536 tracing::warn!(
537 timeout_ms = DAEMON_VERSION_RESTART_WAIT_MS,
538 "timed out waiting for stale daemon to exit after version-mismatch shutdown"
539 );
540 Ok(())
541}
542
543fn request_or_autostart(
544 models_dir: &Path,
545 request: &DaemonRequest,
546 cli_autostart: bool,
547) -> Result<Option<DaemonResponse>, AppError> {
548 if DAEMON_VERSION_STATE.load(Ordering::SeqCst) == VERSION_NOT_CHECKED {
549 maybe_restart_for_version_mismatch(models_dir)?;
550 }
551
552 if let Some(response) = request_if_available(models_dir, request)? {
553 clear_spawn_backoff_state(models_dir).ok();
554 return Ok(Some(response));
555 }
556
557 if !should_autostart(cli_autostart) {
558 return Ok(None);
559 }
560
561 if !ensure_daemon_running(models_dir)? {
562 return Ok(None);
563 }
564
565 request_if_available(models_dir, request)
566}
567
568fn ensure_daemon_running(models_dir: &Path) -> Result<bool, AppError> {
569 if (try_ping(models_dir)?).is_some() {
570 clear_spawn_backoff_state(models_dir).ok();
571 return Ok(true);
572 }
573
574 if spawn_backoff_active(models_dir)? {
575 tracing::warn!("daemon autostart suppressed by backoff window");
576 return Ok(false);
577 }
578
579 let spawn_lock = match try_acquire_spawn_lock(models_dir)? {
580 Some(lock) => lock,
581 None => return wait_for_daemon_ready(models_dir),
582 };
583
584 if (try_ping(models_dir)?).is_some() {
585 clear_spawn_backoff_state(models_dir).ok();
586 drop(spawn_lock);
587 return Ok(true);
588 }
589
590 let exe = match std::env::current_exe() {
591 Ok(path) => path,
592 Err(err) => {
593 record_spawn_failure(models_dir, format!("current_exe failed: {err}"))?;
594 drop(spawn_lock);
595 return Ok(false);
596 }
597 };
598
599 let mut child = std::process::Command::new(exe);
600 child
601 .arg("daemon")
602 .arg("--idle-shutdown-secs")
603 .arg(DAEMON_IDLE_SHUTDOWN_SECS.to_string())
604 .env("SQLITE_GRAPHRAG_DAEMON_CHILD", "1")
605 .env_remove("LD_PRELOAD")
606 .env_remove("LD_LIBRARY_PATH")
607 .env_remove("LD_AUDIT")
608 .env_remove("DYLD_INSERT_LIBRARIES")
609 .env_remove("DYLD_LIBRARY_PATH")
610 .stdin(Stdio::null())
611 .stdout(Stdio::null())
612 .stderr(Stdio::null());
613
614 match child.spawn() {
615 Ok(child_handle) => {
616 let pid = child_handle.id();
628 drop(child_handle);
629 tracing::debug!(
630 pid,
631 "daemon detached; lifecycle managed via spawn lock + readiness file"
632 );
633 let ready = wait_for_daemon_ready(models_dir)?;
634 if ready {
635 clear_spawn_backoff_state(models_dir).ok();
636 } else {
637 record_spawn_failure(
638 models_dir,
639 "daemon did not become healthy after autostart".to_string(),
640 )?;
641 }
642 drop(spawn_lock);
643 Ok(ready)
644 }
645 Err(err) => {
646 record_spawn_failure(models_dir, format!("daemon spawn failed: {err}"))?;
647 drop(spawn_lock);
648 Ok(false)
649 }
650 }
651}
652
653fn wait_for_daemon_ready(models_dir: &Path) -> Result<bool, AppError> {
654 let deadline = Instant::now() + Duration::from_millis(DAEMON_AUTO_START_MAX_WAIT_MS);
655 let mut sleep_ms = DAEMON_AUTO_START_INITIAL_BACKOFF_MS.max(DAEMON_PING_TIMEOUT_MS);
656
657 while Instant::now() < deadline {
658 if (try_ping(models_dir)?).is_some() {
659 return Ok(true);
660 }
661 thread::sleep(Duration::from_millis(sleep_ms));
662 sleep_ms = (sleep_ms * 2).min(DAEMON_AUTO_START_MAX_BACKOFF_MS);
663 }
664
665 Ok(false)
666}
667
668fn autostart_disabled_by_env() -> bool {
669 std::env::var("SQLITE_GRAPHRAG_DAEMON_CHILD").as_deref() == Ok("1")
670 || std::env::var("SQLITE_GRAPHRAG_DAEMON_FORCE_AUTOSTART").as_deref() != Ok("1")
671 && std::env::var("SQLITE_GRAPHRAG_DAEMON_DISABLE_AUTOSTART").as_deref() == Ok("1")
672}
673
674fn daemon_control_dir(models_dir: &Path) -> PathBuf {
675 models_dir
676 .parent()
677 .map(Path::to_path_buf)
678 .unwrap_or_else(|| models_dir.to_path_buf())
679}
680
681fn spawn_lock_path(models_dir: &Path) -> PathBuf {
682 daemon_control_dir(models_dir).join("daemon-spawn.lock")
683}
684
685fn spawn_state_path(models_dir: &Path) -> PathBuf {
686 daemon_control_dir(models_dir).join("daemon-spawn-state.json")
687}
688
689fn try_acquire_spawn_lock(models_dir: &Path) -> Result<Option<File>, AppError> {
690 let path = spawn_lock_path(models_dir);
691 std::fs::create_dir_all(crate::paths::parent_or_err(&path)?).map_err(AppError::Io)?;
692 let file = OpenOptions::new()
693 .read(true)
694 .write(true)
695 .create(true)
696 .truncate(false)
697 .open(path)
698 .map_err(AppError::Io)?;
699
700 let deadline = Instant::now() + Duration::from_millis(DAEMON_SPAWN_LOCK_WAIT_MS);
701 loop {
702 match file.try_lock_exclusive() {
703 Ok(()) => return Ok(Some(file)),
704 Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
705 if Instant::now() >= deadline {
706 return Ok(None);
707 }
708 thread::sleep(Duration::from_millis(50));
709 }
710 Err(err) => return Err(AppError::Io(err)),
711 }
712 }
713}
714
715fn spawn_backoff_active(models_dir: &Path) -> Result<bool, AppError> {
716 let state = load_spawn_state(models_dir)?;
717 Ok(now_epoch_ms() < state.not_before_epoch_ms)
718}
719
720fn record_spawn_failure(models_dir: &Path, message: String) -> Result<(), AppError> {
721 let mut state = load_spawn_state(models_dir)?;
722 state.consecutive_failures = state.consecutive_failures.saturating_add(1);
723 let exponent = state.consecutive_failures.saturating_sub(1).min(6);
724 let base_ms =
725 (DAEMON_SPAWN_BACKOFF_BASE_MS * (1_u64 << exponent)).min(DAEMON_AUTO_START_MAX_BACKOFF_MS);
726 let half = base_ms / 2;
729 let jitter = if half == 0 { 0 } else { fastrand::u64(0..half) };
730 let backoff_ms = half + jitter;
731 state.not_before_epoch_ms = now_epoch_ms() + backoff_ms;
732 state.last_error = Some(message);
733 save_spawn_state(models_dir, &state)
734}
735
736fn clear_spawn_backoff_state(models_dir: &Path) -> Result<(), AppError> {
737 let path = spawn_state_path(models_dir);
738 if path.exists() {
739 std::fs::remove_file(path).map_err(AppError::Io)?;
740 }
741 Ok(())
742}
743
744fn load_spawn_state(models_dir: &Path) -> Result<DaemonSpawnState, AppError> {
745 let path = spawn_state_path(models_dir);
746 if !path.exists() {
747 return Ok(DaemonSpawnState::default());
748 }
749
750 let bytes = std::fs::read(path).map_err(AppError::Io)?;
751 serde_json::from_slice(&bytes).map_err(AppError::Json)
752}
753
754fn save_spawn_state(models_dir: &Path, state: &DaemonSpawnState) -> Result<(), AppError> {
755 let path = spawn_state_path(models_dir);
756 std::fs::create_dir_all(crate::paths::parent_or_err(&path)?).map_err(AppError::Io)?;
757 let bytes = serde_json::to_vec(state).map_err(AppError::Json)?;
758 std::fs::write(path, bytes).map_err(AppError::Io)
759}
760
761fn gliner_variant_from_env() -> String {
764 std::env::var("SQLITE_GRAPHRAG_GLINER_VARIANT").unwrap_or_else(|_| "fp32".to_string())
765}
766
767fn now_epoch_ms() -> u64 {
768 SystemTime::now()
769 .duration_since(UNIX_EPOCH)
770 .unwrap_or_else(|_| Duration::from_secs(0))
771 .as_millis() as u64
772}
773
774fn to_local_socket_name(name: &str) -> std::io::Result<interprocess::local_socket::Name<'static>> {
775 if let Ok(ns_name) = name.to_string().to_ns_name::<GenericNamespaced>() {
776 return Ok(ns_name);
777 }
778
779 let path = if cfg!(unix) {
784 let base = std::env::var_os("XDG_RUNTIME_DIR")
785 .or_else(|| std::env::var_os("SQLITE_GRAPHRAG_HOME"))
786 .map(std::path::PathBuf::from)
787 .unwrap_or_else(std::env::temp_dir);
788 base.join(format!("{name}.sock"))
789 .to_string_lossy()
790 .into_owned()
791 } else {
792 format!(r"\\.\pipe\{name}")
793 };
794 path.to_fs_name::<GenericFilePath>()
795}
796
797#[cfg(test)]
798mod tests {
799 use super::*;
800
801 #[test]
802 fn record_and_clear_spawn_backoff_state() {
803 let tmp = tempfile::tempdir().unwrap();
804 let models_dir = tmp.path().join("cache").join("models");
805 std::fs::create_dir_all(&models_dir).unwrap();
806
807 assert!(!spawn_backoff_active(&models_dir).unwrap());
808
809 record_spawn_failure(&models_dir, "spawn failed".to_string()).unwrap();
810 assert!(spawn_backoff_active(&models_dir).unwrap());
811
812 let state = load_spawn_state(&models_dir).unwrap();
813 assert_eq!(state.consecutive_failures, 1);
814 assert_eq!(state.last_error.as_deref(), Some("spawn failed"));
815
816 clear_spawn_backoff_state(&models_dir).unwrap();
817 assert!(!spawn_backoff_active(&models_dir).unwrap());
818 }
819
820 #[test]
821 fn daemon_control_dir_uses_models_parent() {
822 let base = PathBuf::from("/tmp/sqlite-graphrag-cache-test");
823 let models_dir = base.join("models");
824 assert_eq!(daemon_control_dir(&models_dir), base);
825 }
826
827 #[test]
828 fn version_state_constants_are_distinct() {
829 assert_ne!(VERSION_NOT_CHECKED, VERSION_COMPATIBLE);
830 assert_ne!(VERSION_NOT_CHECKED, VERSION_RESTART_ATTEMPTED);
831 assert_ne!(VERSION_COMPATIBLE, VERSION_RESTART_ATTEMPTED);
832 }
833
834 #[test]
835 fn wait_for_daemon_exit_immediate_when_not_running() {
836 let tmp = tempfile::tempdir().unwrap();
837 let models_dir = tmp.path().join("cache").join("models");
838 std::fs::create_dir_all(&models_dir).unwrap();
839
840 let start = Instant::now();
841 wait_for_daemon_exit(&models_dir).unwrap();
842 assert!(start.elapsed() < Duration::from_millis(500));
844 }
845
846 #[test]
847 fn spawn_backoff_exponent_caps_at_six() {
848 let tmp = tempfile::tempdir().unwrap();
849 let models_dir = tmp.path().join("cache").join("models");
850 std::fs::create_dir_all(&models_dir).unwrap();
851
852 for i in 0..10 {
854 record_spawn_failure(&models_dir, format!("failure {i}")).unwrap();
855 }
856
857 let state = load_spawn_state(&models_dir).unwrap();
858 assert_eq!(state.consecutive_failures, 10);
859
860 let max_base =
863 (DAEMON_SPAWN_BACKOFF_BASE_MS * (1_u64 << 6)).min(DAEMON_AUTO_START_MAX_BACKOFF_MS);
864 let now = now_epoch_ms();
866 assert!(state.not_before_epoch_ms <= now + max_base);
867 }
868
869 #[test]
870 fn spawn_backoff_half_jitter_in_range() {
871 let base_ms: u64 = 100;
874 let half = base_ms / 2;
875 for _ in 0..100 {
876 let jitter = fastrand::u64(0..half);
877 let result = half + jitter;
878 assert!(result >= half, "result {result} below half {half}");
879 assert!(result < base_ms, "result {result} not below base {base_ms}");
880 }
881 }
882
883 #[test]
884 fn to_local_socket_name_produces_valid_result() {
885 let result = to_local_socket_name("sqlite-graphrag-test-daemon");
886 assert!(result.is_ok(), "expected Ok, got {result:?}");
887 let name = result.unwrap();
889 let display = format!("{name:?}");
890 assert!(!display.is_empty());
891 }
892
893 #[test]
894 fn version_cas_not_checked_to_compatible() {
895 let state = AtomicU8::new(VERSION_NOT_CHECKED);
896 let result = state.compare_exchange(
897 VERSION_NOT_CHECKED,
898 VERSION_COMPATIBLE,
899 Ordering::SeqCst,
900 Ordering::SeqCst,
901 );
902 assert!(result.is_ok());
903 assert_eq!(state.load(Ordering::SeqCst), VERSION_COMPATIBLE);
904 }
905
906 #[test]
907 fn version_cas_prevents_double_restart() {
908 let state = AtomicU8::new(VERSION_NOT_CHECKED);
909
910 let first = state.compare_exchange(
912 VERSION_NOT_CHECKED,
913 VERSION_RESTART_ATTEMPTED,
914 Ordering::SeqCst,
915 Ordering::SeqCst,
916 );
917 assert!(first.is_ok());
918
919 let second = state.compare_exchange(
921 VERSION_NOT_CHECKED,
922 VERSION_RESTART_ATTEMPTED,
923 Ordering::SeqCst,
924 Ordering::SeqCst,
925 );
926 assert!(second.is_err());
927 assert_eq!(state.load(Ordering::SeqCst), VERSION_RESTART_ATTEMPTED);
928 }
929
930 #[test]
931 fn ping_response_includes_model_fields() {
932 let resp = DaemonResponse::Ok {
933 pid: 42,
934 version: "1.0.0".to_string(),
935 handled_embed_requests: 7,
936 model_name: "multilingual-e5-small".to_string(),
937 model_variant: "fp32".to_string(),
938 };
939 let json = serde_json::to_value(&resp).expect("serialization failed");
940 assert_eq!(json["model_name"], "multilingual-e5-small");
941 assert_eq!(json["model_variant"], "fp32");
942 assert_eq!(json["status"], "ok");
943 assert_eq!(json["handled_embed_requests"], 7u64);
944 }
945
946 #[test]
947 fn gliner_variant_defaults_to_fp32() {
948 std::env::remove_var("SQLITE_GRAPHRAG_GLINER_VARIANT");
950 let variant = gliner_variant_from_env();
951 assert_eq!(variant, "fp32");
952 }
953
954 #[test]
955 fn gliner_variant_reads_env_var() {
956 std::env::set_var("SQLITE_GRAPHRAG_GLINER_VARIANT", "int8");
957 let variant = gliner_variant_from_env();
958 std::env::remove_var("SQLITE_GRAPHRAG_GLINER_VARIANT");
959 assert_eq!(variant, "int8");
960 }
961
962 #[test]
963 fn spawn_state_serialization_roundtrip() {
964 let tmp = tempfile::tempdir().unwrap();
965 let models_dir = tmp.path().join("cache").join("models");
966 std::fs::create_dir_all(&models_dir).unwrap();
967
968 let original = DaemonSpawnState {
969 consecutive_failures: 3,
970 not_before_epoch_ms: 9_999_999_999,
971 last_error: Some("test error message".to_string()),
972 };
973 save_spawn_state(&models_dir, &original).unwrap();
974
975 let loaded = load_spawn_state(&models_dir).unwrap();
976 assert_eq!(loaded.consecutive_failures, original.consecutive_failures);
977 assert_eq!(loaded.not_before_epoch_ms, original.not_before_epoch_ms);
978 assert_eq!(loaded.last_error, original.last_error);
979 }
980}