Skip to main content

sqlite_graphrag/
daemon.rs

1//! IPC daemon: keeps the embedding model warm across CLI invocations.
2//!
3//! Manages the background process lifecycle, Unix-socket IPC protocol, and
4//! auto-start/backoff logic so embeddings are served without cold-start cost.
5
6use crate::constants::{
7    DAEMON_AUTO_START_INITIAL_BACKOFF_MS, DAEMON_AUTO_START_MAX_BACKOFF_MS,
8    DAEMON_AUTO_START_MAX_WAIT_MS, DAEMON_IDLE_SHUTDOWN_SECS, DAEMON_PING_TIMEOUT_MS,
9    DAEMON_SPAWN_BACKOFF_BASE_MS, DAEMON_SPAWN_LOCK_WAIT_MS, SQLITE_GRAPHRAG_VERSION,
10};
11use crate::errors::AppError;
12use crate::{embedder, shutdown_requested};
13use fs4::fs_std::FileExt;
14use interprocess::local_socket::{
15    prelude::LocalSocketStream,
16    traits::{Listener as _, Stream as _},
17    GenericFilePath, GenericNamespaced, ListenerNonblockingMode, ListenerOptions, ToFsName,
18    ToNsName,
19};
20use serde::{Deserialize, Serialize};
21use std::fs::{File, OpenOptions};
22use std::io::{BufRead, BufReader, Write};
23use std::path::{Path, PathBuf};
24use std::process::Stdio;
25use std::sync::atomic::{AtomicU64, Ordering};
26use std::sync::Arc;
27use std::thread;
28use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
29
30#[derive(Debug, Serialize, Deserialize)]
31#[serde(tag = "request", rename_all = "snake_case")]
32pub enum DaemonRequest {
33    Ping,
34    Shutdown,
35    EmbedPassage {
36        text: String,
37    },
38    EmbedQuery {
39        text: String,
40    },
41    EmbedPassages {
42        texts: Vec<String>,
43        token_counts: Vec<usize>,
44    },
45}
46
47#[derive(Debug, Serialize, Deserialize)]
48#[serde(tag = "status", rename_all = "snake_case")]
49pub enum DaemonResponse {
50    Listening {
51        pid: u32,
52        socket: String,
53        idle_shutdown_secs: u64,
54    },
55    Ok {
56        pid: u32,
57        version: String,
58        handled_embed_requests: u64,
59    },
60    PassageEmbedding {
61        embedding: Vec<f32>,
62        handled_embed_requests: u64,
63    },
64    QueryEmbedding {
65        embedding: Vec<f32>,
66        handled_embed_requests: u64,
67    },
68    PassageEmbeddings {
69        embeddings: Vec<Vec<f32>>,
70        handled_embed_requests: u64,
71    },
72    ShuttingDown {
73        handled_embed_requests: u64,
74    },
75    Error {
76        message: String,
77    },
78}
79
80#[derive(Debug, Default, Serialize, Deserialize)]
81struct DaemonSpawnState {
82    consecutive_failures: u32,
83    not_before_epoch_ms: u64,
84    last_error: Option<String>,
85}
86
87pub fn daemon_label(models_dir: &Path) -> String {
88    let hash = blake3::hash(models_dir.to_string_lossy().as_bytes())
89        .to_hex()
90        .to_string();
91    format!("sqlite-graphrag-daemon-{}", &hash[..16])
92}
93
94pub fn try_ping(models_dir: &Path) -> Result<Option<DaemonResponse>, AppError> {
95    request_if_available(models_dir, &DaemonRequest::Ping)
96}
97
98pub fn try_shutdown(models_dir: &Path) -> Result<Option<DaemonResponse>, AppError> {
99    request_if_available(models_dir, &DaemonRequest::Shutdown)
100}
101
102pub fn embed_passage_or_local(models_dir: &Path, text: &str) -> Result<Vec<f32>, AppError> {
103    match request_or_autostart(
104        models_dir,
105        &DaemonRequest::EmbedPassage {
106            text: text.to_string(),
107        },
108        true,
109    )? {
110        Some(DaemonResponse::PassageEmbedding { embedding, .. }) => Ok(embedding),
111        Some(DaemonResponse::Error { message }) => Err(AppError::Embedding(message)),
112        Some(other) => Err(AppError::Internal(anyhow::anyhow!(
113            "unexpected daemon response for passage embedding: {other:?}"
114        ))),
115        None => {
116            let embedder = embedder::get_embedder(models_dir)?;
117            embedder::embed_passage(embedder, text)
118        }
119    }
120}
121
122pub fn embed_query_or_local(
123    models_dir: &Path,
124    text: &str,
125    cli_autostart: bool,
126) -> Result<Vec<f32>, AppError> {
127    match request_or_autostart(
128        models_dir,
129        &DaemonRequest::EmbedQuery {
130            text: text.to_string(),
131        },
132        cli_autostart,
133    )? {
134        Some(DaemonResponse::QueryEmbedding { embedding, .. }) => Ok(embedding),
135        Some(DaemonResponse::Error { message }) => Err(AppError::Embedding(message)),
136        Some(other) => Err(AppError::Internal(anyhow::anyhow!(
137            "unexpected daemon response for query embedding: {other:?}"
138        ))),
139        None => {
140            let embedder = embedder::get_embedder(models_dir)?;
141            embedder::embed_query(embedder, text)
142        }
143    }
144}
145
146pub fn embed_passages_controlled_or_local(
147    models_dir: &Path,
148    texts: &[&str],
149    token_counts: &[usize],
150) -> Result<Vec<Vec<f32>>, AppError> {
151    let request = DaemonRequest::EmbedPassages {
152        texts: texts.iter().map(|t| (*t).to_string()).collect(),
153        token_counts: token_counts.to_vec(),
154    };
155
156    match request_or_autostart(models_dir, &request, true)? {
157        Some(DaemonResponse::PassageEmbeddings { embeddings, .. }) => Ok(embeddings),
158        Some(DaemonResponse::Error { message }) => Err(AppError::Embedding(message)),
159        Some(other) => Err(AppError::Internal(anyhow::anyhow!(
160            "unexpected daemon response for passage embedding batch: {other:?}"
161        ))),
162        None => {
163            let embedder = embedder::get_embedder(models_dir)?;
164            embedder::embed_passages_controlled(embedder, texts, token_counts)
165        }
166    }
167}
168
169struct DaemonSpawnGuard {
170    models_dir: PathBuf,
171}
172
173impl DaemonSpawnGuard {
174    fn new(models_dir: &Path) -> Self {
175        Self {
176            models_dir: models_dir.to_path_buf(),
177        }
178    }
179}
180
181impl Drop for DaemonSpawnGuard {
182    fn drop(&mut self) {
183        let lock_path = spawn_lock_path(&self.models_dir);
184        if lock_path.exists() {
185            match std::fs::remove_file(&lock_path) {
186                Ok(()) => {
187                    tracing::debug!(
188                        path = %lock_path.display(),
189                        "spawn lock file removed during graceful daemon shutdown"
190                    );
191                }
192                Err(err) if err.kind() == std::io::ErrorKind::NotFound => {}
193                Err(err) => {
194                    tracing::warn!(
195                        error = %err,
196                        path = %lock_path.display(),
197                        "failed to remove spawn lock file while shutting down daemon"
198                    );
199                }
200            }
201        }
202        tracing::info!(
203            "daemon shut down gracefully; socket will be cleaned up by OS or by the next daemon via try_overwrite"
204        );
205    }
206}
207
208pub fn run(models_dir: &Path, idle_shutdown_secs: u64) -> Result<(), AppError> {
209    // Scale worker threads to available parallelism so embedding tasks saturate CPU cores.
210    // Clamped to [2, 8] to avoid excessive threads on high-core machines.
211    let permits = std::thread::available_parallelism()
212        .map(|n| n.get())
213        .unwrap_or(2)
214        .clamp(2, 8);
215    let rt = tokio::runtime::Builder::new_multi_thread()
216        .worker_threads(permits)
217        .thread_name("daemon-worker")
218        .enable_all()
219        .build()
220        .map_err(AppError::Io)?;
221
222    rt.block_on(run_async(models_dir, idle_shutdown_secs, permits))
223}
224
225async fn run_async(
226    models_dir: &Path,
227    idle_shutdown_secs: u64,
228    permits: usize,
229) -> Result<(), AppError> {
230    let socket = daemon_label(models_dir);
231    let name = to_local_socket_name(&socket)?;
232    let listener = ListenerOptions::new()
233        .name(name)
234        .nonblocking(ListenerNonblockingMode::Accept)
235        .try_overwrite(true)
236        .create_sync()
237        .map_err(AppError::Io)?;
238
239    // Guard that cleans up the spawn lock file on graceful shutdown.
240    // SIGKILL does not trigger Drop; in that case try_overwrite(true) above is the fallback.
241    let _spawn_guard = DaemonSpawnGuard::new(models_dir);
242
243    // Warm the model once per daemon process inside spawn_blocking so the
244    // ONNX session initialisation (CPU-bound, may take several seconds) does
245    // not block a tokio worker thread.
246    let models_dir_warm = models_dir.to_path_buf();
247    tokio::task::spawn_blocking(move || embedder::get_embedder(&models_dir_warm).map(|_| ()))
248        .await
249        .map_err(|e| AppError::Internal(anyhow::anyhow!("model warm-up panicked: {e}")))??;
250
251    crate::output::emit_json(&DaemonResponse::Listening {
252        pid: std::process::id(),
253        socket,
254        idle_shutdown_secs,
255    })?;
256
257    let handled_embed_requests = Arc::new(AtomicU64::new(0));
258    let mut last_activity = Instant::now();
259    let models_dir = models_dir.to_path_buf();
260    // Bound concurrent spawn_blocking tasks to the same thread count as the runtime.
261    let permit_pool = Arc::new(tokio::sync::Semaphore::new(permits));
262
263    loop {
264        if shutdown_requested() {
265            break;
266        }
267
268        if !daemon_control_dir(&models_dir).exists() {
269            tracing::info!("daemon control directory disappeared; shutting down");
270            break;
271        }
272
273        match listener.accept() {
274            Ok(stream) => {
275                last_activity = Instant::now();
276                let models_dir_clone = models_dir.clone();
277                let counter = Arc::clone(&handled_embed_requests);
278                let permit =
279                    permit_pool.clone().acquire_owned().await.map_err(|e| {
280                        AppError::Internal(anyhow::anyhow!("semaphore closed: {e}"))
281                    })?;
282                let should_exit = tokio::task::spawn_blocking(move || {
283                    let _permit = permit; // hold until end of scope
284                    handle_client(stream, &models_dir_clone, &counter)
285                })
286                .await
287                .map_err(|e| {
288                    AppError::Internal(anyhow::anyhow!("spawn_blocking panicked: {e}"))
289                })??;
290
291                if should_exit {
292                    break;
293                }
294            }
295            Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
296                if last_activity.elapsed() >= Duration::from_secs(idle_shutdown_secs) {
297                    tracing::info!(
298                        idle_shutdown_secs,
299                        handled_embed_requests = handled_embed_requests.load(Ordering::Relaxed),
300                        "daemon idle timeout reached"
301                    );
302                    break;
303                }
304                tokio::time::sleep(Duration::from_millis(50)).await;
305            }
306            Err(err) => return Err(AppError::Io(err)),
307        }
308    }
309
310    Ok(())
311}
312
313fn handle_client(
314    stream: LocalSocketStream,
315    models_dir: &Path,
316    handled_embed_requests: &AtomicU64,
317) -> Result<bool, AppError> {
318    let mut reader = BufReader::new(stream);
319    let mut line = String::new();
320    reader.read_line(&mut line).map_err(AppError::Io)?;
321
322    if line.trim().is_empty() {
323        write_response(
324            reader.get_mut(),
325            &DaemonResponse::Error {
326                message: "empty request to daemon".to_string(),
327            },
328        )?;
329        return Ok(false);
330    }
331
332    let request: DaemonRequest = serde_json::from_str(line.trim()).map_err(AppError::Json)?;
333    let (response, should_exit) = match request {
334        DaemonRequest::Ping => (
335            DaemonResponse::Ok {
336                pid: std::process::id(),
337                version: SQLITE_GRAPHRAG_VERSION.to_string(),
338                handled_embed_requests: handled_embed_requests.load(Ordering::Relaxed),
339            },
340            false,
341        ),
342        DaemonRequest::Shutdown => (
343            DaemonResponse::ShuttingDown {
344                handled_embed_requests: handled_embed_requests.load(Ordering::Relaxed),
345            },
346            true,
347        ),
348        DaemonRequest::EmbedPassage { text } => {
349            let embedder = embedder::get_embedder(models_dir)?;
350            let embedding = embedder::embed_passage(embedder, &text)?;
351            let count = handled_embed_requests.fetch_add(1, Ordering::Relaxed) + 1;
352            (
353                DaemonResponse::PassageEmbedding {
354                    embedding,
355                    handled_embed_requests: count,
356                },
357                false,
358            )
359        }
360        DaemonRequest::EmbedQuery { text } => {
361            let embedder = embedder::get_embedder(models_dir)?;
362            let embedding = embedder::embed_query(embedder, &text)?;
363            let count = handled_embed_requests.fetch_add(1, Ordering::Relaxed) + 1;
364            (
365                DaemonResponse::QueryEmbedding {
366                    embedding,
367                    handled_embed_requests: count,
368                },
369                false,
370            )
371        }
372        DaemonRequest::EmbedPassages {
373            texts,
374            token_counts,
375        } => {
376            let embedder = embedder::get_embedder(models_dir)?;
377            let text_refs: Vec<&str> = texts.iter().map(String::as_str).collect();
378            let embeddings =
379                embedder::embed_passages_controlled(embedder, &text_refs, &token_counts)?;
380            let count = handled_embed_requests.fetch_add(1, Ordering::Relaxed) + 1;
381            (
382                DaemonResponse::PassageEmbeddings {
383                    embeddings,
384                    handled_embed_requests: count,
385                },
386                false,
387            )
388        }
389    };
390
391    write_response(reader.get_mut(), &response)?;
392    Ok(should_exit)
393}
394
395fn write_response(
396    stream: &mut LocalSocketStream,
397    response: &DaemonResponse,
398) -> Result<(), AppError> {
399    serde_json::to_writer(&mut *stream, response).map_err(AppError::Json)?;
400    stream.write_all(b"\n").map_err(AppError::Io)?;
401    stream.flush().map_err(AppError::Io)?;
402    Ok(())
403}
404
405fn request_if_available(
406    models_dir: &Path,
407    request: &DaemonRequest,
408) -> Result<Option<DaemonResponse>, AppError> {
409    let socket = daemon_label(models_dir);
410    let name = match to_local_socket_name(&socket) {
411        Ok(name) => name,
412        Err(err) => return Err(AppError::Io(err)),
413    };
414
415    let mut stream = match LocalSocketStream::connect(name) {
416        Ok(stream) => stream,
417        Err(err)
418            if matches!(
419                err.kind(),
420                std::io::ErrorKind::NotFound
421                    | std::io::ErrorKind::ConnectionRefused
422                    | std::io::ErrorKind::AddrNotAvailable
423                    | std::io::ErrorKind::TimedOut
424            ) =>
425        {
426            return Ok(None);
427        }
428        Err(err) => return Err(AppError::Io(err)),
429    };
430
431    serde_json::to_writer(&mut stream, request).map_err(AppError::Json)?;
432    stream.write_all(b"\n").map_err(AppError::Io)?;
433    stream.flush().map_err(AppError::Io)?;
434
435    let mut reader = BufReader::new(stream);
436    let mut line = String::new();
437    reader.read_line(&mut line).map_err(AppError::Io)?;
438    if line.trim().is_empty() {
439        return Err(AppError::Embedding(
440            "daemon returned an empty response".into(),
441        ));
442    }
443
444    let response = serde_json::from_str(line.trim()).map_err(AppError::Json)?;
445    Ok(Some(response))
446}
447
448fn should_autostart(cli_flag: bool) -> bool {
449    if !cli_flag {
450        return false; // explicit CLI override wins
451    }
452    !autostart_disabled_by_env()
453}
454
455fn request_or_autostart(
456    models_dir: &Path,
457    request: &DaemonRequest,
458    cli_autostart: bool,
459) -> Result<Option<DaemonResponse>, AppError> {
460    if let Some(response) = request_if_available(models_dir, request)? {
461        clear_spawn_backoff_state(models_dir).ok();
462        return Ok(Some(response));
463    }
464
465    if !should_autostart(cli_autostart) {
466        return Ok(None);
467    }
468
469    if !ensure_daemon_running(models_dir)? {
470        return Ok(None);
471    }
472
473    request_if_available(models_dir, request)
474}
475
476fn ensure_daemon_running(models_dir: &Path) -> Result<bool, AppError> {
477    if (try_ping(models_dir)?).is_some() {
478        clear_spawn_backoff_state(models_dir).ok();
479        return Ok(true);
480    }
481
482    if spawn_backoff_active(models_dir)? {
483        tracing::warn!("daemon autostart suppressed by backoff window");
484        return Ok(false);
485    }
486
487    let spawn_lock = match try_acquire_spawn_lock(models_dir)? {
488        Some(lock) => lock,
489        None => return wait_for_daemon_ready(models_dir),
490    };
491
492    if (try_ping(models_dir)?).is_some() {
493        clear_spawn_backoff_state(models_dir).ok();
494        drop(spawn_lock);
495        return Ok(true);
496    }
497
498    let exe = match std::env::current_exe() {
499        Ok(path) => path,
500        Err(err) => {
501            record_spawn_failure(models_dir, format!("current_exe failed: {err}"))?;
502            drop(spawn_lock);
503            return Ok(false);
504        }
505    };
506
507    let mut child = std::process::Command::new(exe);
508    child
509        .arg("daemon")
510        .arg("--idle-shutdown-secs")
511        .arg(DAEMON_IDLE_SHUTDOWN_SECS.to_string())
512        .env("SQLITE_GRAPHRAG_DAEMON_CHILD", "1")
513        .stdin(Stdio::null())
514        .stdout(Stdio::null())
515        .stderr(Stdio::null());
516
517    match child.spawn() {
518        Ok(child_handle) => {
519            // SAFETY: deliberate orphan daemon detach. The Child handle is intentionally
520            // dropped without a corresponding `.wait()` call because the daemon owns its
521            // own lifecycle: `Stdio::null()` is set on stdin/stdout/stderr (above) so the
522            // child does not inherit terminal handles, the spawn lock file at
523            // `<models_dir>/.daemon.spawn.lock` prevents concurrent spawns, and the
524            // daemon shuts itself down via `DAEMON_IDLE_SHUTDOWN_SECS` (or an explicit
525            // `daemon stop`/SIGTERM). Keeping the handle here would block the parent
526            // CLI in the foreground until the daemon exited, defeating the autostart
527            // contract that callers expect.
528            // See also: docs_rules/rules_rust_processos_externos.md "Child detach justificado"
529            let pid = child_handle.id();
530            drop(child_handle);
531            tracing::debug!(
532                pid,
533                "daemon detached; lifecycle managed via spawn lock + readiness file"
534            );
535            let ready = wait_for_daemon_ready(models_dir)?;
536            if ready {
537                clear_spawn_backoff_state(models_dir).ok();
538            } else {
539                record_spawn_failure(
540                    models_dir,
541                    "daemon did not become healthy after autostart".to_string(),
542                )?;
543            }
544            drop(spawn_lock);
545            Ok(ready)
546        }
547        Err(err) => {
548            record_spawn_failure(models_dir, format!("daemon spawn failed: {err}"))?;
549            drop(spawn_lock);
550            Ok(false)
551        }
552    }
553}
554
555fn wait_for_daemon_ready(models_dir: &Path) -> Result<bool, AppError> {
556    let deadline = Instant::now() + Duration::from_millis(DAEMON_AUTO_START_MAX_WAIT_MS);
557    let mut sleep_ms = DAEMON_AUTO_START_INITIAL_BACKOFF_MS.max(DAEMON_PING_TIMEOUT_MS);
558
559    while Instant::now() < deadline {
560        if (try_ping(models_dir)?).is_some() {
561            return Ok(true);
562        }
563        thread::sleep(Duration::from_millis(sleep_ms));
564        sleep_ms = (sleep_ms * 2).min(DAEMON_AUTO_START_MAX_BACKOFF_MS);
565    }
566
567    Ok(false)
568}
569
570fn autostart_disabled_by_env() -> bool {
571    std::env::var("SQLITE_GRAPHRAG_DAEMON_CHILD").as_deref() == Ok("1")
572        || std::env::var("SQLITE_GRAPHRAG_DAEMON_FORCE_AUTOSTART").as_deref() != Ok("1")
573            && std::env::var("SQLITE_GRAPHRAG_DAEMON_DISABLE_AUTOSTART").as_deref() == Ok("1")
574}
575
576fn daemon_control_dir(models_dir: &Path) -> PathBuf {
577    models_dir
578        .parent()
579        .map(Path::to_path_buf)
580        .unwrap_or_else(|| models_dir.to_path_buf())
581}
582
583fn spawn_lock_path(models_dir: &Path) -> PathBuf {
584    daemon_control_dir(models_dir).join("daemon-spawn.lock")
585}
586
587fn spawn_state_path(models_dir: &Path) -> PathBuf {
588    daemon_control_dir(models_dir).join("daemon-spawn-state.json")
589}
590
591fn try_acquire_spawn_lock(models_dir: &Path) -> Result<Option<File>, AppError> {
592    let path = spawn_lock_path(models_dir);
593    std::fs::create_dir_all(crate::paths::parent_or_err(&path)?).map_err(AppError::Io)?;
594    let file = OpenOptions::new()
595        .read(true)
596        .write(true)
597        .create(true)
598        .truncate(false)
599        .open(path)
600        .map_err(AppError::Io)?;
601
602    let deadline = Instant::now() + Duration::from_millis(DAEMON_SPAWN_LOCK_WAIT_MS);
603    loop {
604        match file.try_lock_exclusive() {
605            Ok(()) => return Ok(Some(file)),
606            Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
607                if Instant::now() >= deadline {
608                    return Ok(None);
609                }
610                thread::sleep(Duration::from_millis(50));
611            }
612            Err(err) => return Err(AppError::Io(err)),
613        }
614    }
615}
616
617fn spawn_backoff_active(models_dir: &Path) -> Result<bool, AppError> {
618    let state = load_spawn_state(models_dir)?;
619    Ok(now_epoch_ms() < state.not_before_epoch_ms)
620}
621
622fn record_spawn_failure(models_dir: &Path, message: String) -> Result<(), AppError> {
623    let mut state = load_spawn_state(models_dir)?;
624    state.consecutive_failures = state.consecutive_failures.saturating_add(1);
625    let exponent = state.consecutive_failures.saturating_sub(1).min(6);
626    let base_ms =
627        (DAEMON_SPAWN_BACKOFF_BASE_MS * (1_u64 << exponent)).min(DAEMON_AUTO_START_MAX_BACKOFF_MS);
628    // v1.0.36 (L2): half jitter to avoid retry herd when multiple CLI instances
629    // detect daemon failure simultaneously. Effective backoff range is
630    // `[base/2, base)`. Uses SystemTime nanoseconds as a dependency-free
631    // randomness source; daemon spawn frequency is low so quality is sufficient.
632    let half = base_ms / 2;
633    let jitter_seed = SystemTime::now()
634        .duration_since(UNIX_EPOCH)
635        .map(|d| d.subsec_nanos() as u64)
636        .unwrap_or(0);
637    let jitter = if half == 0 { 0 } else { jitter_seed % half };
638    let backoff_ms = half + jitter;
639    state.not_before_epoch_ms = now_epoch_ms() + backoff_ms;
640    state.last_error = Some(message);
641    save_spawn_state(models_dir, &state)
642}
643
644fn clear_spawn_backoff_state(models_dir: &Path) -> Result<(), AppError> {
645    let path = spawn_state_path(models_dir);
646    if path.exists() {
647        std::fs::remove_file(path).map_err(AppError::Io)?;
648    }
649    Ok(())
650}
651
652fn load_spawn_state(models_dir: &Path) -> Result<DaemonSpawnState, AppError> {
653    let path = spawn_state_path(models_dir);
654    if !path.exists() {
655        return Ok(DaemonSpawnState::default());
656    }
657
658    let bytes = std::fs::read(path).map_err(AppError::Io)?;
659    serde_json::from_slice(&bytes).map_err(AppError::Json)
660}
661
662fn save_spawn_state(models_dir: &Path, state: &DaemonSpawnState) -> Result<(), AppError> {
663    let path = spawn_state_path(models_dir);
664    std::fs::create_dir_all(crate::paths::parent_or_err(&path)?).map_err(AppError::Io)?;
665    let bytes = serde_json::to_vec(state).map_err(AppError::Json)?;
666    std::fs::write(path, bytes).map_err(AppError::Io)
667}
668
669fn now_epoch_ms() -> u64 {
670    SystemTime::now()
671        .duration_since(UNIX_EPOCH)
672        .unwrap_or_else(|_| Duration::from_secs(0))
673        .as_millis() as u64
674}
675
676fn to_local_socket_name(name: &str) -> std::io::Result<interprocess::local_socket::Name<'static>> {
677    if let Ok(ns_name) = name.to_string().to_ns_name::<GenericNamespaced>() {
678        return Ok(ns_name);
679    }
680
681    // Fallback when abstract namespaces are unavailable. Honours XDG_RUNTIME_DIR
682    // (Linux user-private runtime dir) or SQLITE_GRAPHRAG_HOME (project override)
683    // before falling back to /tmp, which can collide when the same name is used
684    // by another user/project on a multi-tenant host. Added in v1.0.35.
685    let path = if cfg!(unix) {
686        let base = std::env::var_os("XDG_RUNTIME_DIR")
687            .or_else(|| std::env::var_os("SQLITE_GRAPHRAG_HOME"))
688            .map(std::path::PathBuf::from)
689            .unwrap_or_else(std::env::temp_dir);
690        base.join(format!("{name}.sock"))
691            .to_string_lossy()
692            .into_owned()
693    } else {
694        format!(r"\\.\pipe\{name}")
695    };
696    path.to_fs_name::<GenericFilePath>()
697}
698
699#[cfg(test)]
700mod tests {
701    use super::*;
702
703    #[test]
704    fn record_and_clear_spawn_backoff_state() {
705        let tmp = tempfile::tempdir().unwrap();
706        let models_dir = tmp.path().join("cache").join("models");
707        std::fs::create_dir_all(&models_dir).unwrap();
708
709        assert!(!spawn_backoff_active(&models_dir).unwrap());
710
711        record_spawn_failure(&models_dir, "spawn failed".to_string()).unwrap();
712        assert!(spawn_backoff_active(&models_dir).unwrap());
713
714        let state = load_spawn_state(&models_dir).unwrap();
715        assert_eq!(state.consecutive_failures, 1);
716        assert_eq!(state.last_error.as_deref(), Some("spawn failed"));
717
718        clear_spawn_backoff_state(&models_dir).unwrap();
719        assert!(!spawn_backoff_active(&models_dir).unwrap());
720    }
721
722    #[test]
723    fn daemon_control_dir_uses_models_parent() {
724        let base = PathBuf::from("/tmp/sqlite-graphrag-cache-test");
725        let models_dir = base.join("models");
726        assert_eq!(daemon_control_dir(&models_dir), base);
727    }
728}