Skip to main content

sqlite_graphrag/
daemon.rs

1//! IPC daemon: keeps the embedding model warm across CLI invocations.
2//!
3//! Manages the background process lifecycle, Unix-socket IPC protocol, and
4//! auto-start/backoff logic so embeddings are served without cold-start cost.
5
6use crate::constants::{
7    DAEMON_AUTO_START_INITIAL_BACKOFF_MS, DAEMON_AUTO_START_MAX_BACKOFF_MS,
8    DAEMON_AUTO_START_MAX_WAIT_MS, DAEMON_IDLE_SHUTDOWN_SECS, DAEMON_PING_TIMEOUT_MS,
9    DAEMON_SPAWN_BACKOFF_BASE_MS, DAEMON_SPAWN_LOCK_WAIT_MS, SQLITE_GRAPHRAG_VERSION,
10};
11use crate::errors::AppError;
12use crate::{embedder, shutdown_requested};
13use fs4::fs_std::FileExt;
14use interprocess::local_socket::{
15    prelude::LocalSocketStream,
16    traits::{Listener as _, Stream as _},
17    GenericFilePath, GenericNamespaced, ListenerNonblockingMode, ListenerOptions, ToFsName,
18    ToNsName,
19};
20use serde::{Deserialize, Serialize};
21use std::fs::{File, OpenOptions};
22use std::io::{BufRead, BufReader, Write};
23use std::path::{Path, PathBuf};
24use std::process::Stdio;
25use std::sync::atomic::{AtomicU64, Ordering};
26use std::sync::Arc;
27use std::thread;
28use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
29
30#[derive(Debug, Serialize, Deserialize)]
31#[serde(tag = "request", rename_all = "snake_case")]
32pub enum DaemonRequest {
33    Ping,
34    Shutdown,
35    EmbedPassage {
36        text: String,
37    },
38    EmbedQuery {
39        text: String,
40    },
41    EmbedPassages {
42        texts: Vec<String>,
43        token_counts: Vec<usize>,
44    },
45}
46
47#[derive(Debug, Serialize, Deserialize)]
48#[serde(tag = "status", rename_all = "snake_case")]
49pub enum DaemonResponse {
50    Listening {
51        pid: u32,
52        socket: String,
53        idle_shutdown_secs: u64,
54    },
55    Ok {
56        pid: u32,
57        version: String,
58        handled_embed_requests: u64,
59    },
60    PassageEmbedding {
61        embedding: Vec<f32>,
62        handled_embed_requests: u64,
63    },
64    QueryEmbedding {
65        embedding: Vec<f32>,
66        handled_embed_requests: u64,
67    },
68    PassageEmbeddings {
69        embeddings: Vec<Vec<f32>>,
70        handled_embed_requests: u64,
71    },
72    ShuttingDown {
73        handled_embed_requests: u64,
74    },
75    Error {
76        message: String,
77    },
78}
79
80#[derive(Debug, Default, Serialize, Deserialize)]
81struct DaemonSpawnState {
82    consecutive_failures: u32,
83    not_before_epoch_ms: u64,
84    last_error: Option<String>,
85}
86
87pub fn daemon_label(models_dir: &Path) -> String {
88    let hash = blake3::hash(models_dir.to_string_lossy().as_bytes())
89        .to_hex()
90        .to_string();
91    format!("sqlite-graphrag-daemon-{}", &hash[..16])
92}
93
94pub fn try_ping(models_dir: &Path) -> Result<Option<DaemonResponse>, AppError> {
95    request_if_available(models_dir, &DaemonRequest::Ping)
96}
97
98pub fn try_shutdown(models_dir: &Path) -> Result<Option<DaemonResponse>, AppError> {
99    request_if_available(models_dir, &DaemonRequest::Shutdown)
100}
101
102pub fn embed_passage_or_local(models_dir: &Path, text: &str) -> Result<Vec<f32>, AppError> {
103    match request_or_autostart(
104        models_dir,
105        &DaemonRequest::EmbedPassage {
106            text: text.to_string(),
107        },
108        true,
109    )? {
110        Some(DaemonResponse::PassageEmbedding { embedding, .. }) => Ok(embedding),
111        Some(DaemonResponse::Error { message }) => Err(AppError::Embedding(message)),
112        Some(other) => Err(AppError::Internal(anyhow::anyhow!(
113            "unexpected daemon response for passage embedding: {other:?}"
114        ))),
115        None => {
116            let embedder = embedder::get_embedder(models_dir)?;
117            embedder::embed_passage(embedder, text)
118        }
119    }
120}
121
122pub fn embed_query_or_local(
123    models_dir: &Path,
124    text: &str,
125    cli_autostart: bool,
126) -> Result<Vec<f32>, AppError> {
127    match request_or_autostart(
128        models_dir,
129        &DaemonRequest::EmbedQuery {
130            text: text.to_string(),
131        },
132        cli_autostart,
133    )? {
134        Some(DaemonResponse::QueryEmbedding { embedding, .. }) => Ok(embedding),
135        Some(DaemonResponse::Error { message }) => Err(AppError::Embedding(message)),
136        Some(other) => Err(AppError::Internal(anyhow::anyhow!(
137            "unexpected daemon response for query embedding: {other:?}"
138        ))),
139        None => {
140            let embedder = embedder::get_embedder(models_dir)?;
141            embedder::embed_query(embedder, text)
142        }
143    }
144}
145
146pub fn embed_passages_controlled_or_local(
147    models_dir: &Path,
148    texts: &[&str],
149    token_counts: &[usize],
150) -> Result<Vec<Vec<f32>>, AppError> {
151    let request = DaemonRequest::EmbedPassages {
152        texts: texts.iter().map(|t| (*t).to_string()).collect(),
153        token_counts: token_counts.to_vec(),
154    };
155
156    match request_or_autostart(models_dir, &request, true)? {
157        Some(DaemonResponse::PassageEmbeddings { embeddings, .. }) => Ok(embeddings),
158        Some(DaemonResponse::Error { message }) => Err(AppError::Embedding(message)),
159        Some(other) => Err(AppError::Internal(anyhow::anyhow!(
160            "unexpected daemon response for passage embedding batch: {other:?}"
161        ))),
162        None => {
163            let embedder = embedder::get_embedder(models_dir)?;
164            embedder::embed_passages_controlled(embedder, texts, token_counts)
165        }
166    }
167}
168
169struct DaemonSpawnGuard {
170    models_dir: PathBuf,
171}
172
173impl DaemonSpawnGuard {
174    fn new(models_dir: &Path) -> Self {
175        Self {
176            models_dir: models_dir.to_path_buf(),
177        }
178    }
179}
180
181impl Drop for DaemonSpawnGuard {
182    fn drop(&mut self) {
183        let lock_path = spawn_lock_path(&self.models_dir);
184        if lock_path.exists() {
185            match std::fs::remove_file(&lock_path) {
186                Ok(()) => {
187                    tracing::debug!(
188                        path = %lock_path.display(),
189                        "spawn lock file removed during graceful daemon shutdown"
190                    );
191                }
192                Err(err) if err.kind() == std::io::ErrorKind::NotFound => {}
193                Err(err) => {
194                    tracing::warn!(
195                        error = %err,
196                        path = %lock_path.display(),
197                        "failed to remove spawn lock file while shutting down daemon"
198                    );
199                }
200            }
201        }
202        tracing::info!(
203            "daemon shut down gracefully; socket will be cleaned up by OS or by the next daemon via try_overwrite"
204        );
205    }
206}
207
208pub fn run(models_dir: &Path, idle_shutdown_secs: u64) -> Result<(), AppError> {
209    // Tokio runtime with 2 worker threads to reduce idle threads in the daemon.
210    // The accept loop remains synchronous; each connection is dispatched via spawn_blocking
211    // so heavy embeddings do not block the tokio workers.
212    let rt = tokio::runtime::Builder::new_multi_thread()
213        .worker_threads(2)
214        .thread_name("daemon-worker")
215        .enable_all()
216        .build()
217        .map_err(AppError::Io)?;
218
219    rt.block_on(run_async(models_dir, idle_shutdown_secs))
220}
221
222async fn run_async(models_dir: &Path, idle_shutdown_secs: u64) -> Result<(), AppError> {
223    let socket = daemon_label(models_dir);
224    let name = to_local_socket_name(&socket)?;
225    let listener = ListenerOptions::new()
226        .name(name)
227        .nonblocking(ListenerNonblockingMode::Accept)
228        .try_overwrite(true)
229        .create_sync()
230        .map_err(AppError::Io)?;
231
232    // Guard that cleans up the spawn lock file on graceful shutdown.
233    // SIGKILL does not trigger Drop; in that case try_overwrite(true) above is the fallback.
234    let _spawn_guard = DaemonSpawnGuard::new(models_dir);
235
236    // Warm the model once per daemon process inside spawn_blocking so the
237    // ONNX session initialisation (CPU-bound, may take several seconds) does
238    // not block a tokio worker thread.
239    let models_dir_warm = models_dir.to_path_buf();
240    tokio::task::spawn_blocking(move || embedder::get_embedder(&models_dir_warm).map(|_| ()))
241        .await
242        .map_err(|e| AppError::Internal(anyhow::anyhow!("model warm-up panicked: {e}")))??;
243
244    crate::output::emit_json(&DaemonResponse::Listening {
245        pid: std::process::id(),
246        socket,
247        idle_shutdown_secs,
248    })?;
249
250    let handled_embed_requests = Arc::new(AtomicU64::new(0));
251    let mut last_activity = Instant::now();
252    let models_dir = models_dir.to_path_buf();
253
254    loop {
255        if shutdown_requested() {
256            break;
257        }
258
259        if !daemon_control_dir(&models_dir).exists() {
260            tracing::info!("daemon control directory disappeared; shutting down");
261            break;
262        }
263
264        match listener.accept() {
265            Ok(stream) => {
266                last_activity = Instant::now();
267                let models_dir_clone = models_dir.clone();
268                let counter = Arc::clone(&handled_embed_requests);
269                let should_exit = tokio::task::spawn_blocking(move || {
270                    handle_client(stream, &models_dir_clone, &counter)
271                })
272                .await
273                .map_err(|e| {
274                    AppError::Internal(anyhow::anyhow!("spawn_blocking panicked: {e}"))
275                })??;
276
277                if should_exit {
278                    break;
279                }
280            }
281            Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
282                if last_activity.elapsed() >= Duration::from_secs(idle_shutdown_secs) {
283                    tracing::info!(
284                        idle_shutdown_secs,
285                        handled_embed_requests = handled_embed_requests.load(Ordering::Relaxed),
286                        "daemon idle timeout reached"
287                    );
288                    break;
289                }
290                tokio::time::sleep(Duration::from_millis(50)).await;
291            }
292            Err(err) => return Err(AppError::Io(err)),
293        }
294    }
295
296    Ok(())
297}
298
299fn handle_client(
300    stream: LocalSocketStream,
301    models_dir: &Path,
302    handled_embed_requests: &AtomicU64,
303) -> Result<bool, AppError> {
304    let mut reader = BufReader::new(stream);
305    let mut line = String::new();
306    reader.read_line(&mut line).map_err(AppError::Io)?;
307
308    if line.trim().is_empty() {
309        write_response(
310            reader.get_mut(),
311            &DaemonResponse::Error {
312                message: "empty request to daemon".to_string(),
313            },
314        )?;
315        return Ok(false);
316    }
317
318    let request: DaemonRequest = serde_json::from_str(line.trim()).map_err(AppError::Json)?;
319    let (response, should_exit) = match request {
320        DaemonRequest::Ping => (
321            DaemonResponse::Ok {
322                pid: std::process::id(),
323                version: SQLITE_GRAPHRAG_VERSION.to_string(),
324                handled_embed_requests: handled_embed_requests.load(Ordering::Relaxed),
325            },
326            false,
327        ),
328        DaemonRequest::Shutdown => (
329            DaemonResponse::ShuttingDown {
330                handled_embed_requests: handled_embed_requests.load(Ordering::Relaxed),
331            },
332            true,
333        ),
334        DaemonRequest::EmbedPassage { text } => {
335            let embedder = embedder::get_embedder(models_dir)?;
336            let embedding = embedder::embed_passage(embedder, &text)?;
337            let count = handled_embed_requests.fetch_add(1, Ordering::Relaxed) + 1;
338            (
339                DaemonResponse::PassageEmbedding {
340                    embedding,
341                    handled_embed_requests: count,
342                },
343                false,
344            )
345        }
346        DaemonRequest::EmbedQuery { text } => {
347            let embedder = embedder::get_embedder(models_dir)?;
348            let embedding = embedder::embed_query(embedder, &text)?;
349            let count = handled_embed_requests.fetch_add(1, Ordering::Relaxed) + 1;
350            (
351                DaemonResponse::QueryEmbedding {
352                    embedding,
353                    handled_embed_requests: count,
354                },
355                false,
356            )
357        }
358        DaemonRequest::EmbedPassages {
359            texts,
360            token_counts,
361        } => {
362            let embedder = embedder::get_embedder(models_dir)?;
363            let text_refs: Vec<&str> = texts.iter().map(String::as_str).collect();
364            let embeddings =
365                embedder::embed_passages_controlled(embedder, &text_refs, &token_counts)?;
366            let count = handled_embed_requests.fetch_add(1, Ordering::Relaxed) + 1;
367            (
368                DaemonResponse::PassageEmbeddings {
369                    embeddings,
370                    handled_embed_requests: count,
371                },
372                false,
373            )
374        }
375    };
376
377    write_response(reader.get_mut(), &response)?;
378    Ok(should_exit)
379}
380
381fn write_response(
382    stream: &mut LocalSocketStream,
383    response: &DaemonResponse,
384) -> Result<(), AppError> {
385    serde_json::to_writer(&mut *stream, response).map_err(AppError::Json)?;
386    stream.write_all(b"\n").map_err(AppError::Io)?;
387    stream.flush().map_err(AppError::Io)?;
388    Ok(())
389}
390
391fn request_if_available(
392    models_dir: &Path,
393    request: &DaemonRequest,
394) -> Result<Option<DaemonResponse>, AppError> {
395    let socket = daemon_label(models_dir);
396    let name = match to_local_socket_name(&socket) {
397        Ok(name) => name,
398        Err(err) => return Err(AppError::Io(err)),
399    };
400
401    let mut stream = match LocalSocketStream::connect(name) {
402        Ok(stream) => stream,
403        Err(err)
404            if matches!(
405                err.kind(),
406                std::io::ErrorKind::NotFound
407                    | std::io::ErrorKind::ConnectionRefused
408                    | std::io::ErrorKind::AddrNotAvailable
409                    | std::io::ErrorKind::TimedOut
410            ) =>
411        {
412            return Ok(None);
413        }
414        Err(err) => return Err(AppError::Io(err)),
415    };
416
417    serde_json::to_writer(&mut stream, request).map_err(AppError::Json)?;
418    stream.write_all(b"\n").map_err(AppError::Io)?;
419    stream.flush().map_err(AppError::Io)?;
420
421    let mut reader = BufReader::new(stream);
422    let mut line = String::new();
423    reader.read_line(&mut line).map_err(AppError::Io)?;
424    if line.trim().is_empty() {
425        return Err(AppError::Embedding(
426            "daemon returned an empty response".into(),
427        ));
428    }
429
430    let response = serde_json::from_str(line.trim()).map_err(AppError::Json)?;
431    Ok(Some(response))
432}
433
434fn should_autostart(cli_flag: bool) -> bool {
435    if !cli_flag {
436        return false; // explicit CLI override wins
437    }
438    !autostart_disabled_by_env()
439}
440
441fn request_or_autostart(
442    models_dir: &Path,
443    request: &DaemonRequest,
444    cli_autostart: bool,
445) -> Result<Option<DaemonResponse>, AppError> {
446    if let Some(response) = request_if_available(models_dir, request)? {
447        clear_spawn_backoff_state(models_dir).ok();
448        return Ok(Some(response));
449    }
450
451    if !should_autostart(cli_autostart) {
452        return Ok(None);
453    }
454
455    if !ensure_daemon_running(models_dir)? {
456        return Ok(None);
457    }
458
459    request_if_available(models_dir, request)
460}
461
462fn ensure_daemon_running(models_dir: &Path) -> Result<bool, AppError> {
463    if (try_ping(models_dir)?).is_some() {
464        clear_spawn_backoff_state(models_dir).ok();
465        return Ok(true);
466    }
467
468    if spawn_backoff_active(models_dir)? {
469        tracing::warn!("daemon autostart suppressed by backoff window");
470        return Ok(false);
471    }
472
473    let spawn_lock = match try_acquire_spawn_lock(models_dir)? {
474        Some(lock) => lock,
475        None => return wait_for_daemon_ready(models_dir),
476    };
477
478    if (try_ping(models_dir)?).is_some() {
479        clear_spawn_backoff_state(models_dir).ok();
480        drop(spawn_lock);
481        return Ok(true);
482    }
483
484    let exe = match std::env::current_exe() {
485        Ok(path) => path,
486        Err(err) => {
487            record_spawn_failure(models_dir, format!("current_exe failed: {err}"))?;
488            drop(spawn_lock);
489            return Ok(false);
490        }
491    };
492
493    let mut child = std::process::Command::new(exe);
494    child
495        .arg("daemon")
496        .arg("--idle-shutdown-secs")
497        .arg(DAEMON_IDLE_SHUTDOWN_SECS.to_string())
498        .env("SQLITE_GRAPHRAG_DAEMON_CHILD", "1")
499        .stdin(Stdio::null())
500        .stdout(Stdio::null())
501        .stderr(Stdio::null());
502
503    match child.spawn() {
504        Ok(child_handle) => {
505            // SAFETY: deliberate orphan daemon detach. The Child handle is intentionally
506            // dropped without a corresponding `.wait()` call because the daemon owns its
507            // own lifecycle: `Stdio::null()` is set on stdin/stdout/stderr (above) so the
508            // child does not inherit terminal handles, the spawn lock file at
509            // `<models_dir>/.daemon.spawn.lock` prevents concurrent spawns, and the
510            // daemon shuts itself down via `DAEMON_IDLE_SHUTDOWN_SECS` (or an explicit
511            // `daemon stop`/SIGTERM). Keeping the handle here would block the parent
512            // CLI in the foreground until the daemon exited, defeating the autostart
513            // contract that callers expect.
514            let pid = child_handle.id();
515            drop(child_handle);
516            tracing::debug!(
517                pid,
518                "daemon detached; lifecycle managed via spawn lock + readiness file"
519            );
520            let ready = wait_for_daemon_ready(models_dir)?;
521            if ready {
522                clear_spawn_backoff_state(models_dir).ok();
523            } else {
524                record_spawn_failure(
525                    models_dir,
526                    "daemon did not become healthy after autostart".to_string(),
527                )?;
528            }
529            drop(spawn_lock);
530            Ok(ready)
531        }
532        Err(err) => {
533            record_spawn_failure(models_dir, format!("daemon spawn failed: {err}"))?;
534            drop(spawn_lock);
535            Ok(false)
536        }
537    }
538}
539
540fn wait_for_daemon_ready(models_dir: &Path) -> Result<bool, AppError> {
541    let deadline = Instant::now() + Duration::from_millis(DAEMON_AUTO_START_MAX_WAIT_MS);
542    let mut sleep_ms = DAEMON_AUTO_START_INITIAL_BACKOFF_MS.max(DAEMON_PING_TIMEOUT_MS);
543
544    while Instant::now() < deadline {
545        if (try_ping(models_dir)?).is_some() {
546            return Ok(true);
547        }
548        thread::sleep(Duration::from_millis(sleep_ms));
549        sleep_ms = (sleep_ms * 2).min(DAEMON_AUTO_START_MAX_BACKOFF_MS);
550    }
551
552    Ok(false)
553}
554
555fn autostart_disabled_by_env() -> bool {
556    std::env::var("SQLITE_GRAPHRAG_DAEMON_CHILD").as_deref() == Ok("1")
557        || std::env::var("SQLITE_GRAPHRAG_DAEMON_FORCE_AUTOSTART").as_deref() != Ok("1")
558            && std::env::var("SQLITE_GRAPHRAG_DAEMON_DISABLE_AUTOSTART").as_deref() == Ok("1")
559}
560
561fn daemon_control_dir(models_dir: &Path) -> PathBuf {
562    models_dir
563        .parent()
564        .map(Path::to_path_buf)
565        .unwrap_or_else(|| models_dir.to_path_buf())
566}
567
568fn spawn_lock_path(models_dir: &Path) -> PathBuf {
569    daemon_control_dir(models_dir).join("daemon-spawn.lock")
570}
571
572fn spawn_state_path(models_dir: &Path) -> PathBuf {
573    daemon_control_dir(models_dir).join("daemon-spawn-state.json")
574}
575
576fn try_acquire_spawn_lock(models_dir: &Path) -> Result<Option<File>, AppError> {
577    let path = spawn_lock_path(models_dir);
578    std::fs::create_dir_all(crate::paths::parent_or_err(&path)?).map_err(AppError::Io)?;
579    let file = OpenOptions::new()
580        .read(true)
581        .write(true)
582        .create(true)
583        .truncate(false)
584        .open(path)
585        .map_err(AppError::Io)?;
586
587    let deadline = Instant::now() + Duration::from_millis(DAEMON_SPAWN_LOCK_WAIT_MS);
588    loop {
589        match file.try_lock_exclusive() {
590            Ok(()) => return Ok(Some(file)),
591            Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
592                if Instant::now() >= deadline {
593                    return Ok(None);
594                }
595                thread::sleep(Duration::from_millis(50));
596            }
597            Err(err) => return Err(AppError::Io(err)),
598        }
599    }
600}
601
602fn spawn_backoff_active(models_dir: &Path) -> Result<bool, AppError> {
603    let state = load_spawn_state(models_dir)?;
604    Ok(now_epoch_ms() < state.not_before_epoch_ms)
605}
606
607fn record_spawn_failure(models_dir: &Path, message: String) -> Result<(), AppError> {
608    let mut state = load_spawn_state(models_dir)?;
609    state.consecutive_failures = state.consecutive_failures.saturating_add(1);
610    let exponent = state.consecutive_failures.saturating_sub(1).min(6);
611    let base_ms =
612        (DAEMON_SPAWN_BACKOFF_BASE_MS * (1_u64 << exponent)).min(DAEMON_AUTO_START_MAX_BACKOFF_MS);
613    // v1.0.36 (L2): half jitter to avoid retry herd when multiple CLI instances
614    // detect daemon failure simultaneously. Effective backoff range is
615    // `[base/2, base)`. Uses SystemTime nanoseconds as a dependency-free
616    // randomness source; daemon spawn frequency is low so quality is sufficient.
617    let half = base_ms / 2;
618    let jitter_seed = SystemTime::now()
619        .duration_since(UNIX_EPOCH)
620        .map(|d| d.subsec_nanos() as u64)
621        .unwrap_or(0);
622    let jitter = if half == 0 { 0 } else { jitter_seed % half };
623    let backoff_ms = half + jitter;
624    state.not_before_epoch_ms = now_epoch_ms() + backoff_ms;
625    state.last_error = Some(message);
626    save_spawn_state(models_dir, &state)
627}
628
629fn clear_spawn_backoff_state(models_dir: &Path) -> Result<(), AppError> {
630    let path = spawn_state_path(models_dir);
631    if path.exists() {
632        std::fs::remove_file(path).map_err(AppError::Io)?;
633    }
634    Ok(())
635}
636
637fn load_spawn_state(models_dir: &Path) -> Result<DaemonSpawnState, AppError> {
638    let path = spawn_state_path(models_dir);
639    if !path.exists() {
640        return Ok(DaemonSpawnState::default());
641    }
642
643    let bytes = std::fs::read(path).map_err(AppError::Io)?;
644    serde_json::from_slice(&bytes).map_err(AppError::Json)
645}
646
647fn save_spawn_state(models_dir: &Path, state: &DaemonSpawnState) -> Result<(), AppError> {
648    let path = spawn_state_path(models_dir);
649    std::fs::create_dir_all(crate::paths::parent_or_err(&path)?).map_err(AppError::Io)?;
650    let bytes = serde_json::to_vec(state).map_err(AppError::Json)?;
651    std::fs::write(path, bytes).map_err(AppError::Io)
652}
653
654fn now_epoch_ms() -> u64 {
655    SystemTime::now()
656        .duration_since(UNIX_EPOCH)
657        .unwrap_or_else(|_| Duration::from_secs(0))
658        .as_millis() as u64
659}
660
661fn to_local_socket_name(name: &str) -> std::io::Result<interprocess::local_socket::Name<'static>> {
662    if let Ok(ns_name) = name.to_string().to_ns_name::<GenericNamespaced>() {
663        return Ok(ns_name);
664    }
665
666    // Fallback when abstract namespaces are unavailable. Honours XDG_RUNTIME_DIR
667    // (Linux user-private runtime dir) or SQLITE_GRAPHRAG_HOME (project override)
668    // before falling back to /tmp, which can collide when the same name is used
669    // by another user/project on a multi-tenant host. Added in v1.0.35.
670    let path = if cfg!(unix) {
671        let base = std::env::var_os("XDG_RUNTIME_DIR")
672            .or_else(|| std::env::var_os("SQLITE_GRAPHRAG_HOME"))
673            .map(std::path::PathBuf::from)
674            .unwrap_or_else(std::env::temp_dir);
675        base.join(format!("{name}.sock"))
676            .to_string_lossy()
677            .into_owned()
678    } else {
679        format!(r"\\.\pipe\{name}")
680    };
681    path.to_fs_name::<GenericFilePath>()
682}
683
684#[cfg(test)]
685mod tests {
686    use super::*;
687
688    #[test]
689    fn record_and_clear_spawn_backoff_state() {
690        let tmp = tempfile::tempdir().unwrap();
691        let models_dir = tmp.path().join("cache").join("models");
692        std::fs::create_dir_all(&models_dir).unwrap();
693
694        assert!(!spawn_backoff_active(&models_dir).unwrap());
695
696        record_spawn_failure(&models_dir, "spawn failed".to_string()).unwrap();
697        assert!(spawn_backoff_active(&models_dir).unwrap());
698
699        let state = load_spawn_state(&models_dir).unwrap();
700        assert_eq!(state.consecutive_failures, 1);
701        assert_eq!(state.last_error.as_deref(), Some("spawn failed"));
702
703        clear_spawn_backoff_state(&models_dir).unwrap();
704        assert!(!spawn_backoff_active(&models_dir).unwrap());
705    }
706
707    #[test]
708    fn daemon_control_dir_uses_models_parent() {
709        let base = PathBuf::from("/tmp/sqlite-graphrag-cache-test");
710        let models_dir = base.join("models");
711        assert_eq!(daemon_control_dir(&models_dir), base);
712    }
713}