Skip to main content

sqlite_graphrag/
daemon.rs

1//! IPC daemon: keeps the embedding model warm across CLI invocations.
2//!
3//! Manages the background process lifecycle, Unix-socket IPC protocol, and
4//! auto-start/backoff logic so embeddings are served without cold-start cost.
5
6use crate::constants::{
7    DAEMON_AUTO_START_INITIAL_BACKOFF_MS, DAEMON_AUTO_START_MAX_BACKOFF_MS,
8    DAEMON_AUTO_START_MAX_WAIT_MS, DAEMON_IDLE_SHUTDOWN_SECS, DAEMON_PING_TIMEOUT_MS,
9    DAEMON_SPAWN_BACKOFF_BASE_MS, DAEMON_SPAWN_LOCK_WAIT_MS, SQLITE_GRAPHRAG_VERSION,
10};
11use crate::errors::AppError;
12use crate::{embedder, shutdown_requested};
13use fs4::fs_std::FileExt;
14use interprocess::local_socket::{
15    prelude::LocalSocketStream,
16    traits::{Listener as _, Stream as _},
17    GenericFilePath, GenericNamespaced, ListenerNonblockingMode, ListenerOptions, ToFsName,
18    ToNsName,
19};
20use serde::{Deserialize, Serialize};
21use std::fs::{File, OpenOptions};
22use std::io::{BufRead, BufReader, Write};
23use std::path::{Path, PathBuf};
24use std::process::Stdio;
25use std::sync::atomic::{AtomicU64, Ordering};
26use std::sync::Arc;
27use std::thread;
28use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
29
30#[derive(Debug, Serialize, Deserialize)]
31#[serde(tag = "request", rename_all = "snake_case")]
32pub enum DaemonRequest {
33    Ping,
34    Shutdown,
35    EmbedPassage {
36        text: String,
37    },
38    EmbedQuery {
39        text: String,
40    },
41    EmbedPassages {
42        texts: Vec<String>,
43        token_counts: Vec<usize>,
44    },
45}
46
47#[derive(Debug, Serialize, Deserialize)]
48#[serde(tag = "status", rename_all = "snake_case")]
49pub enum DaemonResponse {
50    Listening {
51        pid: u32,
52        socket: String,
53        idle_shutdown_secs: u64,
54    },
55    Ok {
56        pid: u32,
57        version: String,
58        handled_embed_requests: u64,
59    },
60    PassageEmbedding {
61        embedding: Vec<f32>,
62        handled_embed_requests: u64,
63    },
64    QueryEmbedding {
65        embedding: Vec<f32>,
66        handled_embed_requests: u64,
67    },
68    PassageEmbeddings {
69        embeddings: Vec<Vec<f32>>,
70        handled_embed_requests: u64,
71    },
72    ShuttingDown {
73        handled_embed_requests: u64,
74    },
75    Error {
76        message: String,
77    },
78}
79
80#[derive(Debug, Default, Serialize, Deserialize)]
81struct DaemonSpawnState {
82    consecutive_failures: u32,
83    not_before_epoch_ms: u64,
84    last_error: Option<String>,
85}
86
87pub fn daemon_label(models_dir: &Path) -> String {
88    let hash = blake3::hash(models_dir.to_string_lossy().as_bytes())
89        .to_hex()
90        .to_string();
91    format!("sqlite-graphrag-daemon-{}", &hash[..16])
92}
93
94pub fn try_ping(models_dir: &Path) -> Result<Option<DaemonResponse>, AppError> {
95    request_if_available(models_dir, &DaemonRequest::Ping)
96}
97
98pub fn try_shutdown(models_dir: &Path) -> Result<Option<DaemonResponse>, AppError> {
99    request_if_available(models_dir, &DaemonRequest::Shutdown)
100}
101
102pub fn embed_passage_or_local(models_dir: &Path, text: &str) -> Result<Vec<f32>, AppError> {
103    match request_or_autostart(
104        models_dir,
105        &DaemonRequest::EmbedPassage {
106            text: text.to_string(),
107        },
108    )? {
109        Some(DaemonResponse::PassageEmbedding { embedding, .. }) => Ok(embedding),
110        Some(DaemonResponse::Error { message }) => Err(AppError::Embedding(message)),
111        Some(other) => Err(AppError::Internal(anyhow::anyhow!(
112            "unexpected daemon response for passage embedding: {other:?}"
113        ))),
114        None => {
115            let embedder = embedder::get_embedder(models_dir)?;
116            embedder::embed_passage(embedder, text)
117        }
118    }
119}
120
121pub fn embed_query_or_local(models_dir: &Path, text: &str) -> Result<Vec<f32>, AppError> {
122    match request_or_autostart(
123        models_dir,
124        &DaemonRequest::EmbedQuery {
125            text: text.to_string(),
126        },
127    )? {
128        Some(DaemonResponse::QueryEmbedding { embedding, .. }) => Ok(embedding),
129        Some(DaemonResponse::Error { message }) => Err(AppError::Embedding(message)),
130        Some(other) => Err(AppError::Internal(anyhow::anyhow!(
131            "unexpected daemon response for query embedding: {other:?}"
132        ))),
133        None => {
134            let embedder = embedder::get_embedder(models_dir)?;
135            embedder::embed_query(embedder, text)
136        }
137    }
138}
139
140pub fn embed_passages_controlled_or_local(
141    models_dir: &Path,
142    texts: &[&str],
143    token_counts: &[usize],
144) -> Result<Vec<Vec<f32>>, AppError> {
145    let request = DaemonRequest::EmbedPassages {
146        texts: texts.iter().map(|t| (*t).to_string()).collect(),
147        token_counts: token_counts.to_vec(),
148    };
149
150    match request_or_autostart(models_dir, &request)? {
151        Some(DaemonResponse::PassageEmbeddings { embeddings, .. }) => Ok(embeddings),
152        Some(DaemonResponse::Error { message }) => Err(AppError::Embedding(message)),
153        Some(other) => Err(AppError::Internal(anyhow::anyhow!(
154            "unexpected daemon response for passage embedding batch: {other:?}"
155        ))),
156        None => {
157            let embedder = embedder::get_embedder(models_dir)?;
158            embedder::embed_passages_controlled(embedder, texts, token_counts)
159        }
160    }
161}
162
163struct DaemonSpawnGuard {
164    models_dir: PathBuf,
165}
166
167impl DaemonSpawnGuard {
168    fn new(models_dir: &Path) -> Self {
169        Self {
170            models_dir: models_dir.to_path_buf(),
171        }
172    }
173}
174
175impl Drop for DaemonSpawnGuard {
176    fn drop(&mut self) {
177        let lock_path = spawn_lock_path(&self.models_dir);
178        if lock_path.exists() {
179            match std::fs::remove_file(&lock_path) {
180                Ok(()) => {
181                    tracing::debug!(
182                        path = %lock_path.display(),
183                        "spawn lock file removed during graceful daemon shutdown"
184                    );
185                }
186                Err(err) if err.kind() == std::io::ErrorKind::NotFound => {}
187                Err(err) => {
188                    tracing::warn!(
189                        error = %err,
190                        path = %lock_path.display(),
191                        "failed to remove spawn lock file while shutting down daemon"
192                    );
193                }
194            }
195        }
196        tracing::info!(
197            "daemon shut down gracefully; socket will be cleaned up by OS or by the next daemon via try_overwrite"
198        );
199    }
200}
201
202pub fn run(models_dir: &Path, idle_shutdown_secs: u64) -> Result<(), AppError> {
203    // Tokio runtime with 2 worker threads to reduce idle threads in the daemon.
204    // The accept loop remains synchronous; each connection is dispatched via spawn_blocking
205    // so heavy embeddings do not block the tokio workers.
206    let rt = tokio::runtime::Builder::new_multi_thread()
207        .worker_threads(2)
208        .thread_name("daemon-worker")
209        .enable_all()
210        .build()
211        .map_err(AppError::Io)?;
212
213    rt.block_on(run_async(models_dir, idle_shutdown_secs))
214}
215
216async fn run_async(models_dir: &Path, idle_shutdown_secs: u64) -> Result<(), AppError> {
217    let socket = daemon_label(models_dir);
218    let name = to_local_socket_name(&socket)?;
219    let listener = ListenerOptions::new()
220        .name(name)
221        .nonblocking(ListenerNonblockingMode::Accept)
222        .try_overwrite(true)
223        .create_sync()
224        .map_err(AppError::Io)?;
225
226    // Guard that cleans up the spawn lock file on graceful shutdown.
227    // SIGKILL does not trigger Drop; in that case try_overwrite(true) above is the fallback.
228    let _spawn_guard = DaemonSpawnGuard::new(models_dir);
229
230    // Warm the model once per daemon process inside spawn_blocking so the
231    // ONNX session initialisation (CPU-bound, may take several seconds) does
232    // not block a tokio worker thread.
233    let models_dir_warm = models_dir.to_path_buf();
234    tokio::task::spawn_blocking(move || embedder::get_embedder(&models_dir_warm).map(|_| ()))
235        .await
236        .map_err(|e| AppError::Internal(anyhow::anyhow!("model warm-up panicked: {e}")))??;
237
238    crate::output::emit_json(&DaemonResponse::Listening {
239        pid: std::process::id(),
240        socket,
241        idle_shutdown_secs,
242    })?;
243
244    let handled_embed_requests = Arc::new(AtomicU64::new(0));
245    let mut last_activity = Instant::now();
246    let models_dir = models_dir.to_path_buf();
247
248    loop {
249        if shutdown_requested() {
250            break;
251        }
252
253        if !daemon_control_dir(&models_dir).exists() {
254            tracing::info!("daemon control directory disappeared; shutting down");
255            break;
256        }
257
258        match listener.accept() {
259            Ok(stream) => {
260                last_activity = Instant::now();
261                let models_dir_clone = models_dir.clone();
262                let counter = Arc::clone(&handled_embed_requests);
263                let should_exit = tokio::task::spawn_blocking(move || {
264                    handle_client(stream, &models_dir_clone, &counter)
265                })
266                .await
267                .map_err(|e| {
268                    AppError::Internal(anyhow::anyhow!("spawn_blocking panicked: {e}"))
269                })??;
270
271                if should_exit {
272                    break;
273                }
274            }
275            Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
276                if last_activity.elapsed() >= Duration::from_secs(idle_shutdown_secs) {
277                    tracing::info!(
278                        idle_shutdown_secs,
279                        handled_embed_requests = handled_embed_requests.load(Ordering::Relaxed),
280                        "daemon idle timeout reached"
281                    );
282                    break;
283                }
284                tokio::time::sleep(Duration::from_millis(50)).await;
285            }
286            Err(err) => return Err(AppError::Io(err)),
287        }
288    }
289
290    Ok(())
291}
292
293fn handle_client(
294    stream: LocalSocketStream,
295    models_dir: &Path,
296    handled_embed_requests: &AtomicU64,
297) -> Result<bool, AppError> {
298    let mut reader = BufReader::new(stream);
299    let mut line = String::new();
300    reader.read_line(&mut line).map_err(AppError::Io)?;
301
302    if line.trim().is_empty() {
303        write_response(
304            reader.get_mut(),
305            &DaemonResponse::Error {
306                message: "empty request to daemon".to_string(),
307            },
308        )?;
309        return Ok(false);
310    }
311
312    let request: DaemonRequest = serde_json::from_str(line.trim()).map_err(AppError::Json)?;
313    let (response, should_exit) = match request {
314        DaemonRequest::Ping => (
315            DaemonResponse::Ok {
316                pid: std::process::id(),
317                version: SQLITE_GRAPHRAG_VERSION.to_string(),
318                handled_embed_requests: handled_embed_requests.load(Ordering::Relaxed),
319            },
320            false,
321        ),
322        DaemonRequest::Shutdown => (
323            DaemonResponse::ShuttingDown {
324                handled_embed_requests: handled_embed_requests.load(Ordering::Relaxed),
325            },
326            true,
327        ),
328        DaemonRequest::EmbedPassage { text } => {
329            let embedder = embedder::get_embedder(models_dir)?;
330            let embedding = embedder::embed_passage(embedder, &text)?;
331            let count = handled_embed_requests.fetch_add(1, Ordering::Relaxed) + 1;
332            (
333                DaemonResponse::PassageEmbedding {
334                    embedding,
335                    handled_embed_requests: count,
336                },
337                false,
338            )
339        }
340        DaemonRequest::EmbedQuery { text } => {
341            let embedder = embedder::get_embedder(models_dir)?;
342            let embedding = embedder::embed_query(embedder, &text)?;
343            let count = handled_embed_requests.fetch_add(1, Ordering::Relaxed) + 1;
344            (
345                DaemonResponse::QueryEmbedding {
346                    embedding,
347                    handled_embed_requests: count,
348                },
349                false,
350            )
351        }
352        DaemonRequest::EmbedPassages {
353            texts,
354            token_counts,
355        } => {
356            let embedder = embedder::get_embedder(models_dir)?;
357            let text_refs: Vec<&str> = texts.iter().map(String::as_str).collect();
358            let embeddings =
359                embedder::embed_passages_controlled(embedder, &text_refs, &token_counts)?;
360            let count = handled_embed_requests.fetch_add(1, Ordering::Relaxed) + 1;
361            (
362                DaemonResponse::PassageEmbeddings {
363                    embeddings,
364                    handled_embed_requests: count,
365                },
366                false,
367            )
368        }
369    };
370
371    write_response(reader.get_mut(), &response)?;
372    Ok(should_exit)
373}
374
375fn write_response(
376    stream: &mut LocalSocketStream,
377    response: &DaemonResponse,
378) -> Result<(), AppError> {
379    serde_json::to_writer(&mut *stream, response).map_err(AppError::Json)?;
380    stream.write_all(b"\n").map_err(AppError::Io)?;
381    stream.flush().map_err(AppError::Io)?;
382    Ok(())
383}
384
385fn request_if_available(
386    models_dir: &Path,
387    request: &DaemonRequest,
388) -> Result<Option<DaemonResponse>, AppError> {
389    let socket = daemon_label(models_dir);
390    let name = match to_local_socket_name(&socket) {
391        Ok(name) => name,
392        Err(err) => return Err(AppError::Io(err)),
393    };
394
395    let mut stream = match LocalSocketStream::connect(name) {
396        Ok(stream) => stream,
397        Err(err)
398            if matches!(
399                err.kind(),
400                std::io::ErrorKind::NotFound
401                    | std::io::ErrorKind::ConnectionRefused
402                    | std::io::ErrorKind::AddrNotAvailable
403                    | std::io::ErrorKind::TimedOut
404            ) =>
405        {
406            return Ok(None);
407        }
408        Err(err) => return Err(AppError::Io(err)),
409    };
410
411    serde_json::to_writer(&mut stream, request).map_err(AppError::Json)?;
412    stream.write_all(b"\n").map_err(AppError::Io)?;
413    stream.flush().map_err(AppError::Io)?;
414
415    let mut reader = BufReader::new(stream);
416    let mut line = String::new();
417    reader.read_line(&mut line).map_err(AppError::Io)?;
418    if line.trim().is_empty() {
419        return Err(AppError::Embedding(
420            "daemon returned an empty response".into(),
421        ));
422    }
423
424    let response = serde_json::from_str(line.trim()).map_err(AppError::Json)?;
425    Ok(Some(response))
426}
427
428fn request_or_autostart(
429    models_dir: &Path,
430    request: &DaemonRequest,
431) -> Result<Option<DaemonResponse>, AppError> {
432    if let Some(response) = request_if_available(models_dir, request)? {
433        clear_spawn_backoff_state(models_dir).ok();
434        return Ok(Some(response));
435    }
436
437    if autostart_disabled() {
438        return Ok(None);
439    }
440
441    if !ensure_daemon_running(models_dir)? {
442        return Ok(None);
443    }
444
445    request_if_available(models_dir, request)
446}
447
448fn ensure_daemon_running(models_dir: &Path) -> Result<bool, AppError> {
449    if (try_ping(models_dir)?).is_some() {
450        clear_spawn_backoff_state(models_dir).ok();
451        return Ok(true);
452    }
453
454    if spawn_backoff_active(models_dir)? {
455        tracing::warn!("daemon autostart suppressed by backoff window");
456        return Ok(false);
457    }
458
459    let spawn_lock = match try_acquire_spawn_lock(models_dir)? {
460        Some(lock) => lock,
461        None => return wait_for_daemon_ready(models_dir),
462    };
463
464    if (try_ping(models_dir)?).is_some() {
465        clear_spawn_backoff_state(models_dir).ok();
466        drop(spawn_lock);
467        return Ok(true);
468    }
469
470    let exe = match std::env::current_exe() {
471        Ok(path) => path,
472        Err(err) => {
473            record_spawn_failure(models_dir, format!("current_exe failed: {err}"))?;
474            drop(spawn_lock);
475            return Ok(false);
476        }
477    };
478
479    let mut child = std::process::Command::new(exe);
480    child
481        .arg("daemon")
482        .arg("--idle-shutdown-secs")
483        .arg(DAEMON_IDLE_SHUTDOWN_SECS.to_string())
484        .env("SQLITE_GRAPHRAG_DAEMON_CHILD", "1")
485        .stdin(Stdio::null())
486        .stdout(Stdio::null())
487        .stderr(Stdio::null());
488
489    match child.spawn() {
490        Ok(child_handle) => {
491            // SAFETY: deliberate orphan daemon detach. The Child handle is intentionally
492            // dropped without a corresponding `.wait()` call because the daemon owns its
493            // own lifecycle: `Stdio::null()` is set on stdin/stdout/stderr (above) so the
494            // child does not inherit terminal handles, the spawn lock file at
495            // `<models_dir>/.daemon.spawn.lock` prevents concurrent spawns, and the
496            // daemon shuts itself down via `DAEMON_IDLE_SHUTDOWN_SECS` (or an explicit
497            // `daemon stop`/SIGTERM). Keeping the handle here would block the parent
498            // CLI in the foreground until the daemon exited, defeating the autostart
499            // contract that callers expect.
500            let pid = child_handle.id();
501            drop(child_handle);
502            tracing::debug!(
503                pid,
504                "daemon detached; lifecycle managed via spawn lock + readiness file"
505            );
506            let ready = wait_for_daemon_ready(models_dir)?;
507            if ready {
508                clear_spawn_backoff_state(models_dir).ok();
509            } else {
510                record_spawn_failure(
511                    models_dir,
512                    "daemon did not become healthy after autostart".to_string(),
513                )?;
514            }
515            drop(spawn_lock);
516            Ok(ready)
517        }
518        Err(err) => {
519            record_spawn_failure(models_dir, format!("daemon spawn failed: {err}"))?;
520            drop(spawn_lock);
521            Ok(false)
522        }
523    }
524}
525
526fn wait_for_daemon_ready(models_dir: &Path) -> Result<bool, AppError> {
527    let deadline = Instant::now() + Duration::from_millis(DAEMON_AUTO_START_MAX_WAIT_MS);
528    let mut sleep_ms = DAEMON_AUTO_START_INITIAL_BACKOFF_MS.max(DAEMON_PING_TIMEOUT_MS);
529
530    while Instant::now() < deadline {
531        if (try_ping(models_dir)?).is_some() {
532            return Ok(true);
533        }
534        thread::sleep(Duration::from_millis(sleep_ms));
535        sleep_ms = (sleep_ms * 2).min(DAEMON_AUTO_START_MAX_BACKOFF_MS);
536    }
537
538    Ok(false)
539}
540
541fn autostart_disabled() -> bool {
542    std::env::var("SQLITE_GRAPHRAG_DAEMON_CHILD").as_deref() == Ok("1")
543        || std::env::var("SQLITE_GRAPHRAG_DAEMON_FORCE_AUTOSTART").as_deref() != Ok("1")
544            && std::env::var("SQLITE_GRAPHRAG_DAEMON_DISABLE_AUTOSTART").as_deref() == Ok("1")
545}
546
547fn daemon_control_dir(models_dir: &Path) -> PathBuf {
548    models_dir
549        .parent()
550        .map(Path::to_path_buf)
551        .unwrap_or_else(|| models_dir.to_path_buf())
552}
553
554fn spawn_lock_path(models_dir: &Path) -> PathBuf {
555    daemon_control_dir(models_dir).join("daemon-spawn.lock")
556}
557
558fn spawn_state_path(models_dir: &Path) -> PathBuf {
559    daemon_control_dir(models_dir).join("daemon-spawn-state.json")
560}
561
562fn try_acquire_spawn_lock(models_dir: &Path) -> Result<Option<File>, AppError> {
563    let path = spawn_lock_path(models_dir);
564    std::fs::create_dir_all(crate::paths::parent_or_err(&path)?).map_err(AppError::Io)?;
565    let file = OpenOptions::new()
566        .read(true)
567        .write(true)
568        .create(true)
569        .truncate(false)
570        .open(path)
571        .map_err(AppError::Io)?;
572
573    let deadline = Instant::now() + Duration::from_millis(DAEMON_SPAWN_LOCK_WAIT_MS);
574    loop {
575        match file.try_lock_exclusive() {
576            Ok(()) => return Ok(Some(file)),
577            Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
578                if Instant::now() >= deadline {
579                    return Ok(None);
580                }
581                thread::sleep(Duration::from_millis(50));
582            }
583            Err(err) => return Err(AppError::Io(err)),
584        }
585    }
586}
587
588fn spawn_backoff_active(models_dir: &Path) -> Result<bool, AppError> {
589    let state = load_spawn_state(models_dir)?;
590    Ok(now_epoch_ms() < state.not_before_epoch_ms)
591}
592
593fn record_spawn_failure(models_dir: &Path, message: String) -> Result<(), AppError> {
594    let mut state = load_spawn_state(models_dir)?;
595    state.consecutive_failures = state.consecutive_failures.saturating_add(1);
596    let exponent = state.consecutive_failures.saturating_sub(1).min(6);
597    let base_ms =
598        (DAEMON_SPAWN_BACKOFF_BASE_MS * (1_u64 << exponent)).min(DAEMON_AUTO_START_MAX_BACKOFF_MS);
599    // v1.0.36 (L2): half jitter to avoid retry herd when multiple CLI instances
600    // detect daemon failure simultaneously. Effective backoff range is
601    // `[base/2, base)`. Uses SystemTime nanoseconds as a dependency-free
602    // randomness source; daemon spawn frequency is low so quality is sufficient.
603    let half = base_ms / 2;
604    let jitter_seed = SystemTime::now()
605        .duration_since(UNIX_EPOCH)
606        .map(|d| d.subsec_nanos() as u64)
607        .unwrap_or(0);
608    let jitter = if half == 0 { 0 } else { jitter_seed % half };
609    let backoff_ms = half + jitter;
610    state.not_before_epoch_ms = now_epoch_ms() + backoff_ms;
611    state.last_error = Some(message);
612    save_spawn_state(models_dir, &state)
613}
614
615fn clear_spawn_backoff_state(models_dir: &Path) -> Result<(), AppError> {
616    let path = spawn_state_path(models_dir);
617    if path.exists() {
618        std::fs::remove_file(path).map_err(AppError::Io)?;
619    }
620    Ok(())
621}
622
623fn load_spawn_state(models_dir: &Path) -> Result<DaemonSpawnState, AppError> {
624    let path = spawn_state_path(models_dir);
625    if !path.exists() {
626        return Ok(DaemonSpawnState::default());
627    }
628
629    let bytes = std::fs::read(path).map_err(AppError::Io)?;
630    serde_json::from_slice(&bytes).map_err(AppError::Json)
631}
632
633fn save_spawn_state(models_dir: &Path, state: &DaemonSpawnState) -> Result<(), AppError> {
634    let path = spawn_state_path(models_dir);
635    std::fs::create_dir_all(crate::paths::parent_or_err(&path)?).map_err(AppError::Io)?;
636    let bytes = serde_json::to_vec(state).map_err(AppError::Json)?;
637    std::fs::write(path, bytes).map_err(AppError::Io)
638}
639
640fn now_epoch_ms() -> u64 {
641    SystemTime::now()
642        .duration_since(UNIX_EPOCH)
643        .unwrap_or_else(|_| Duration::from_secs(0))
644        .as_millis() as u64
645}
646
647fn to_local_socket_name(name: &str) -> std::io::Result<interprocess::local_socket::Name<'static>> {
648    if let Ok(ns_name) = name.to_string().to_ns_name::<GenericNamespaced>() {
649        return Ok(ns_name);
650    }
651
652    // Fallback when abstract namespaces are unavailable. Honours XDG_RUNTIME_DIR
653    // (Linux user-private runtime dir) or SQLITE_GRAPHRAG_HOME (project override)
654    // before falling back to /tmp, which can collide when the same name is used
655    // by another user/project on a multi-tenant host. Added in v1.0.35.
656    let path = if cfg!(unix) {
657        let base = std::env::var_os("XDG_RUNTIME_DIR")
658            .or_else(|| std::env::var_os("SQLITE_GRAPHRAG_HOME"))
659            .map(std::path::PathBuf::from)
660            .unwrap_or_else(|| std::path::PathBuf::from("/tmp"));
661        base.join(format!("{name}.sock"))
662            .to_string_lossy()
663            .into_owned()
664    } else {
665        format!(r"\\.\pipe\{name}")
666    };
667    path.to_fs_name::<GenericFilePath>()
668}
669
670#[cfg(test)]
671mod tests {
672    use super::*;
673
674    #[test]
675    fn record_and_clear_spawn_backoff_state() {
676        let tmp = tempfile::tempdir().unwrap();
677        let models_dir = tmp.path().join("cache").join("models");
678        std::fs::create_dir_all(&models_dir).unwrap();
679
680        assert!(!spawn_backoff_active(&models_dir).unwrap());
681
682        record_spawn_failure(&models_dir, "spawn failed".to_string()).unwrap();
683        assert!(spawn_backoff_active(&models_dir).unwrap());
684
685        let state = load_spawn_state(&models_dir).unwrap();
686        assert_eq!(state.consecutive_failures, 1);
687        assert_eq!(state.last_error.as_deref(), Some("spawn failed"));
688
689        clear_spawn_backoff_state(&models_dir).unwrap();
690        assert!(!spawn_backoff_active(&models_dir).unwrap());
691    }
692
693    #[test]
694    fn daemon_control_dir_uses_models_parent() {
695        let base = PathBuf::from("/tmp/sqlite-graphrag-cache-test");
696        let models_dir = base.join("models");
697        assert_eq!(daemon_control_dir(&models_dir), base);
698    }
699}