Skip to main content

sqlite_graphrag/
daemon.rs

1//! IPC daemon: keeps the embedding model warm across CLI invocations.
2//!
3//! Manages the background process lifecycle, Unix-socket IPC protocol, and
4//! auto-start/backoff logic so embeddings are served without cold-start cost.
5
6use crate::constants::{
7    DAEMON_AUTO_START_INITIAL_BACKOFF_MS, DAEMON_AUTO_START_MAX_BACKOFF_MS,
8    DAEMON_AUTO_START_MAX_WAIT_MS, DAEMON_IDLE_SHUTDOWN_SECS, DAEMON_PING_TIMEOUT_MS,
9    DAEMON_SPAWN_BACKOFF_BASE_MS, DAEMON_SPAWN_LOCK_WAIT_MS, DAEMON_VERSION_RESTART_WAIT_MS,
10    SQLITE_GRAPHRAG_VERSION,
11};
12use crate::errors::AppError;
13use crate::{embedder, shutdown_requested};
14use fs4::fs_std::FileExt;
15use interprocess::local_socket::{
16    prelude::LocalSocketStream,
17    traits::{Listener as _, Stream as _},
18    GenericFilePath, GenericNamespaced, ListenerNonblockingMode, ListenerOptions, ToFsName,
19    ToNsName,
20};
21use serde::{Deserialize, Serialize};
22use std::fs::{File, OpenOptions};
23use std::io::{BufRead, BufReader, Write};
24use std::path::{Path, PathBuf};
25use std::process::Stdio;
26use std::sync::atomic::{AtomicU64, AtomicU8, Ordering};
27use std::sync::Arc;
28use std::thread;
29use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
30
31const VERSION_NOT_CHECKED: u8 = 0;
32const VERSION_COMPATIBLE: u8 = 1;
33const VERSION_RESTART_ATTEMPTED: u8 = 2;
34
35/// Guards against restart loops: tracks version check state per process lifetime.
36static DAEMON_VERSION_STATE: AtomicU8 = AtomicU8::new(VERSION_NOT_CHECKED);
37
38#[derive(Debug, Serialize, Deserialize)]
39#[serde(tag = "request", rename_all = "snake_case")]
40pub enum DaemonRequest {
41    Ping,
42    Shutdown,
43    EmbedPassage {
44        text: String,
45    },
46    EmbedQuery {
47        text: String,
48    },
49    EmbedPassages {
50        texts: Vec<String>,
51        token_counts: Vec<usize>,
52    },
53}
54
55#[derive(Debug, Serialize, Deserialize)]
56#[serde(tag = "status", rename_all = "snake_case")]
57pub enum DaemonResponse {
58    Listening {
59        pid: u32,
60        socket: String,
61        idle_shutdown_secs: u64,
62    },
63    Ok {
64        pid: u32,
65        version: String,
66        handled_embed_requests: u64,
67        model_name: String,
68        model_variant: String,
69    },
70    PassageEmbedding {
71        embedding: Vec<f32>,
72        handled_embed_requests: u64,
73    },
74    QueryEmbedding {
75        embedding: Vec<f32>,
76        handled_embed_requests: u64,
77    },
78    PassageEmbeddings {
79        embeddings: Vec<Vec<f32>>,
80        handled_embed_requests: u64,
81    },
82    ShuttingDown {
83        handled_embed_requests: u64,
84    },
85    Error {
86        message: String,
87    },
88}
89
90#[derive(Debug, Default, Serialize, Deserialize)]
91struct DaemonSpawnState {
92    consecutive_failures: u32,
93    not_before_epoch_ms: u64,
94    last_error: Option<String>,
95}
96
97pub fn daemon_label(models_dir: &Path) -> String {
98    let hash = blake3::hash(models_dir.to_string_lossy().as_bytes())
99        .to_hex()
100        .to_string();
101    format!("sqlite-graphrag-daemon-{}", &hash[..16])
102}
103
104pub fn try_ping(models_dir: &Path) -> Result<Option<DaemonResponse>, AppError> {
105    request_if_available(models_dir, &DaemonRequest::Ping)
106}
107
108pub fn try_shutdown(models_dir: &Path) -> Result<Option<DaemonResponse>, AppError> {
109    request_if_available(models_dir, &DaemonRequest::Shutdown)
110}
111
112pub fn embed_passage_or_local(models_dir: &Path, text: &str) -> Result<Vec<f32>, AppError> {
113    match request_or_autostart(
114        models_dir,
115        &DaemonRequest::EmbedPassage {
116            text: text.to_string(),
117        },
118        true,
119    )? {
120        Some(DaemonResponse::PassageEmbedding { embedding, .. }) => Ok(embedding),
121        Some(DaemonResponse::Error { message }) => Err(AppError::Embedding(message)),
122        Some(other) => Err(AppError::Internal(anyhow::anyhow!(
123            "unexpected daemon response for passage embedding: {other:?}"
124        ))),
125        None => {
126            let embedder = embedder::get_embedder(models_dir)?;
127            embedder::embed_passage(embedder, text)
128        }
129    }
130}
131
132pub fn embed_query_or_local(
133    models_dir: &Path,
134    text: &str,
135    cli_autostart: bool,
136) -> Result<Vec<f32>, AppError> {
137    match request_or_autostart(
138        models_dir,
139        &DaemonRequest::EmbedQuery {
140            text: text.to_string(),
141        },
142        cli_autostart,
143    )? {
144        Some(DaemonResponse::QueryEmbedding { embedding, .. }) => Ok(embedding),
145        Some(DaemonResponse::Error { message }) => Err(AppError::Embedding(message)),
146        Some(other) => Err(AppError::Internal(anyhow::anyhow!(
147            "unexpected daemon response for query embedding: {other:?}"
148        ))),
149        None => {
150            let embedder = embedder::get_embedder(models_dir)?;
151            embedder::embed_query(embedder, text)
152        }
153    }
154}
155
156pub fn embed_passages_controlled_or_local(
157    models_dir: &Path,
158    texts: &[&str],
159    token_counts: &[usize],
160) -> Result<Vec<Vec<f32>>, AppError> {
161    let request = DaemonRequest::EmbedPassages {
162        texts: texts.iter().map(|t| (*t).to_string()).collect(),
163        token_counts: token_counts.to_vec(),
164    };
165
166    match request_or_autostart(models_dir, &request, true)? {
167        Some(DaemonResponse::PassageEmbeddings { embeddings, .. }) => Ok(embeddings),
168        Some(DaemonResponse::Error { message }) => Err(AppError::Embedding(message)),
169        Some(other) => Err(AppError::Internal(anyhow::anyhow!(
170            "unexpected daemon response for passage embedding batch: {other:?}"
171        ))),
172        None => {
173            let embedder = embedder::get_embedder(models_dir)?;
174            embedder::embed_passages_controlled(embedder, texts, token_counts)
175        }
176    }
177}
178
179struct DaemonSpawnGuard {
180    models_dir: PathBuf,
181}
182
183impl DaemonSpawnGuard {
184    fn new(models_dir: &Path) -> Self {
185        Self {
186            models_dir: models_dir.to_path_buf(),
187        }
188    }
189}
190
191impl Drop for DaemonSpawnGuard {
192    fn drop(&mut self) {
193        let lock_path = spawn_lock_path(&self.models_dir);
194        if lock_path.exists() {
195            match std::fs::remove_file(&lock_path) {
196                Ok(()) => {
197                    tracing::debug!(
198                        path = %lock_path.display(),
199                        "spawn lock file removed during graceful daemon shutdown"
200                    );
201                }
202                Err(err) if err.kind() == std::io::ErrorKind::NotFound => {}
203                Err(err) => {
204                    tracing::warn!(
205                        error = %err,
206                        path = %lock_path.display(),
207                        "failed to remove spawn lock file while shutting down daemon"
208                    );
209                }
210            }
211        }
212        tracing::info!(
213            "daemon shut down gracefully; socket will be cleaned up by OS or by the next daemon via try_overwrite"
214        );
215    }
216}
217
218pub fn run(models_dir: &Path, idle_shutdown_secs: u64) -> Result<(), AppError> {
219    // Scale worker threads to available parallelism so embedding tasks saturate CPU cores.
220    // Clamped to [2, 8] to avoid excessive threads on high-core machines.
221    let permits = std::thread::available_parallelism()
222        .map(|n| n.get())
223        .unwrap_or(2)
224        .clamp(2, 8);
225    let rt = tokio::runtime::Builder::new_multi_thread()
226        .worker_threads(permits)
227        .thread_name("daemon-worker")
228        .enable_all()
229        .build()
230        .map_err(AppError::Io)?;
231
232    rt.block_on(run_async(models_dir, idle_shutdown_secs, permits))
233}
234
235async fn run_async(
236    models_dir: &Path,
237    idle_shutdown_secs: u64,
238    permits: usize,
239) -> Result<(), AppError> {
240    let socket = daemon_label(models_dir);
241    let name = to_local_socket_name(&socket)?;
242    let listener = ListenerOptions::new()
243        .name(name)
244        .nonblocking(ListenerNonblockingMode::Accept)
245        .try_overwrite(true)
246        .create_sync()
247        .map_err(AppError::Io)?;
248
249    // Guard that cleans up the spawn lock file on graceful shutdown.
250    // SIGKILL does not trigger Drop; in that case try_overwrite(true) above is the fallback.
251    let _spawn_guard = DaemonSpawnGuard::new(models_dir);
252
253    // Warm the model once per daemon process inside spawn_blocking so the
254    // ONNX session initialisation (CPU-bound, may take several seconds) does
255    // not block a tokio worker thread.
256    let models_dir_warm = models_dir.to_path_buf();
257    tokio::task::spawn_blocking(move || embedder::get_embedder(&models_dir_warm).map(|_| ()))
258        .await
259        .map_err(|e| AppError::Internal(anyhow::anyhow!("model warm-up panicked: {e}")))??;
260
261    crate::output::emit_json(&DaemonResponse::Listening {
262        pid: std::process::id(),
263        socket,
264        idle_shutdown_secs,
265    })?;
266
267    let handled_embed_requests = Arc::new(AtomicU64::new(0));
268    let mut last_activity = Instant::now();
269    let models_dir = models_dir.to_path_buf();
270    // Bound concurrent spawn_blocking tasks to the same thread count as the runtime.
271    let permit_pool = Arc::new(tokio::sync::Semaphore::new(permits));
272
273    loop {
274        if shutdown_requested() {
275            break;
276        }
277
278        if !daemon_control_dir(&models_dir).exists() {
279            tracing::info!("daemon control directory disappeared; shutting down");
280            break;
281        }
282
283        match listener.accept() {
284            Ok(stream) => {
285                last_activity = Instant::now();
286                let models_dir_clone = models_dir.clone();
287                let counter = Arc::clone(&handled_embed_requests);
288                let permit =
289                    permit_pool.clone().acquire_owned().await.map_err(|e| {
290                        AppError::Internal(anyhow::anyhow!("semaphore closed: {e}"))
291                    })?;
292                let should_exit = tokio::task::spawn_blocking(move || {
293                    let _permit = permit; // hold until end of scope
294                    handle_client(stream, &models_dir_clone, &counter)
295                })
296                .await
297                .map_err(|e| {
298                    AppError::Internal(anyhow::anyhow!("spawn_blocking panicked: {e}"))
299                })??;
300
301                if should_exit {
302                    break;
303                }
304            }
305            Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
306                if last_activity.elapsed() >= Duration::from_secs(idle_shutdown_secs) {
307                    tracing::info!(
308                        idle_shutdown_secs,
309                        handled_embed_requests = handled_embed_requests.load(Ordering::Relaxed),
310                        "daemon idle timeout reached"
311                    );
312                    break;
313                }
314                tokio::time::sleep(Duration::from_millis(50)).await;
315            }
316            Err(err) => return Err(AppError::Io(err)),
317        }
318    }
319
320    Ok(())
321}
322
323fn handle_client(
324    stream: LocalSocketStream,
325    models_dir: &Path,
326    handled_embed_requests: &AtomicU64,
327) -> Result<bool, AppError> {
328    let mut reader = BufReader::new(stream);
329    let mut line = String::new();
330    reader.read_line(&mut line).map_err(AppError::Io)?;
331
332    if line.trim().is_empty() {
333        write_response(
334            reader.get_mut(),
335            &DaemonResponse::Error {
336                message: "empty request to daemon".to_string(),
337            },
338        )?;
339        return Ok(false);
340    }
341
342    let request: DaemonRequest = serde_json::from_str(line.trim()).map_err(AppError::Json)?;
343    let (response, should_exit) = match request {
344        DaemonRequest::Ping => (
345            DaemonResponse::Ok {
346                pid: std::process::id(),
347                version: SQLITE_GRAPHRAG_VERSION.to_string(),
348                handled_embed_requests: handled_embed_requests.load(Ordering::Relaxed),
349                model_name: crate::constants::FASTEMBED_MODEL_DEFAULT.to_string(),
350                model_variant: gliner_variant_from_env(),
351            },
352            false,
353        ),
354        DaemonRequest::Shutdown => (
355            DaemonResponse::ShuttingDown {
356                handled_embed_requests: handled_embed_requests.load(Ordering::Relaxed),
357            },
358            true,
359        ),
360        DaemonRequest::EmbedPassage { text } => {
361            let embedder = embedder::get_embedder(models_dir)?;
362            let embedding = embedder::embed_passage(embedder, &text)?;
363            let count = handled_embed_requests.fetch_add(1, Ordering::Relaxed) + 1;
364            (
365                DaemonResponse::PassageEmbedding {
366                    embedding,
367                    handled_embed_requests: count,
368                },
369                false,
370            )
371        }
372        DaemonRequest::EmbedQuery { text } => {
373            let embedder = embedder::get_embedder(models_dir)?;
374            let embedding = embedder::embed_query(embedder, &text)?;
375            let count = handled_embed_requests.fetch_add(1, Ordering::Relaxed) + 1;
376            (
377                DaemonResponse::QueryEmbedding {
378                    embedding,
379                    handled_embed_requests: count,
380                },
381                false,
382            )
383        }
384        DaemonRequest::EmbedPassages {
385            texts,
386            token_counts,
387        } => {
388            let embedder = embedder::get_embedder(models_dir)?;
389            let text_refs: Vec<&str> = texts.iter().map(String::as_str).collect();
390            let embeddings =
391                embedder::embed_passages_controlled(embedder, &text_refs, &token_counts)?;
392            let count = handled_embed_requests.fetch_add(1, Ordering::Relaxed) + 1;
393            (
394                DaemonResponse::PassageEmbeddings {
395                    embeddings,
396                    handled_embed_requests: count,
397                },
398                false,
399            )
400        }
401    };
402
403    write_response(reader.get_mut(), &response)?;
404    Ok(should_exit)
405}
406
407fn write_response(
408    stream: &mut LocalSocketStream,
409    response: &DaemonResponse,
410) -> Result<(), AppError> {
411    serde_json::to_writer(&mut *stream, response).map_err(AppError::Json)?;
412    stream.write_all(b"\n").map_err(AppError::Io)?;
413    stream.flush().map_err(AppError::Io)?;
414    Ok(())
415}
416
417fn request_if_available(
418    models_dir: &Path,
419    request: &DaemonRequest,
420) -> Result<Option<DaemonResponse>, AppError> {
421    let socket = daemon_label(models_dir);
422    let name = match to_local_socket_name(&socket) {
423        Ok(name) => name,
424        Err(err) => return Err(AppError::Io(err)),
425    };
426
427    let mut stream = match LocalSocketStream::connect(name) {
428        Ok(stream) => stream,
429        Err(err)
430            if matches!(
431                err.kind(),
432                std::io::ErrorKind::NotFound
433                    | std::io::ErrorKind::ConnectionRefused
434                    | std::io::ErrorKind::AddrNotAvailable
435                    | std::io::ErrorKind::TimedOut
436            ) =>
437        {
438            return Ok(None);
439        }
440        Err(err) => return Err(AppError::Io(err)),
441    };
442
443    serde_json::to_writer(&mut stream, request).map_err(AppError::Json)?;
444    stream.write_all(b"\n").map_err(AppError::Io)?;
445    stream.flush().map_err(AppError::Io)?;
446
447    let mut reader = BufReader::new(stream);
448    let mut line = String::new();
449    reader.read_line(&mut line).map_err(AppError::Io)?;
450    if line.trim().is_empty() {
451        return Err(AppError::Embedding(
452            "daemon returned an empty response".into(),
453        ));
454    }
455
456    let response = serde_json::from_str(line.trim()).map_err(AppError::Json)?;
457    Ok(Some(response))
458}
459
460fn should_autostart(cli_flag: bool) -> bool {
461    if !cli_flag {
462        return false; // explicit CLI override wins
463    }
464    !autostart_disabled_by_env()
465}
466
467/// Checks whether a running daemon has a different version from the current CLI binary.
468/// If a mismatch is detected, shuts down the stale daemon, waits for it to exit, and
469/// re-spawns a fresh one. The `VERSION_RESTART_ATTEMPTED` state prevents infinite loops:
470/// this function is a no-op after the first attempt regardless of outcome.
471fn maybe_restart_for_version_mismatch(models_dir: &Path) -> Result<(), AppError> {
472    if DAEMON_VERSION_STATE
473        .compare_exchange(
474            VERSION_NOT_CHECKED,
475            VERSION_COMPATIBLE,
476            Ordering::SeqCst,
477            Ordering::SeqCst,
478        )
479        .is_err()
480    {
481        // Already checked (compatible) or already attempted a restart — skip.
482        return Ok(());
483    }
484
485    let response = match try_ping(models_dir)? {
486        Some(r) => r,
487        None => return Ok(()), // no daemon running, nothing to check
488    };
489
490    let daemon_version = match &response {
491        DaemonResponse::Ok { version, .. } => version.as_str(),
492        _ => return Ok(()), // unexpected response shape, skip
493    };
494
495    if daemon_version == SQLITE_GRAPHRAG_VERSION {
496        return Ok(()); // versions match, state already set to COMPATIBLE
497    }
498
499    // Mismatch detected — mark as restart-attempted so we never loop.
500    DAEMON_VERSION_STATE.store(VERSION_RESTART_ATTEMPTED, Ordering::SeqCst);
501
502    tracing::warn!(
503        daemon_version = %daemon_version,
504        cli_version = SQLITE_GRAPHRAG_VERSION,
505        "daemon version mismatch detected; auto-restarting daemon"
506    );
507
508    // Send shutdown request.
509    try_shutdown(models_dir)?;
510
511    // Wait for the stale daemon to exit.
512    wait_for_daemon_exit(models_dir)?;
513
514    // Re-spawn the daemon via the existing mechanism.
515    ensure_daemon_running(models_dir)?;
516
517    Ok(())
518}
519
520/// Polls until the daemon stops responding to pings, with exponential backoff.
521/// Starts at 50 ms, doubles each iteration, caps at 500 ms per sleep.
522/// Returns `Ok(())` once the daemon is gone or the timeout is reached.
523fn wait_for_daemon_exit(models_dir: &Path) -> Result<(), AppError> {
524    let deadline = Instant::now() + Duration::from_millis(DAEMON_VERSION_RESTART_WAIT_MS);
525    let mut sleep_ms: u64 = 50;
526
527    while Instant::now() < deadline {
528        if try_ping(models_dir)?.is_none() {
529            tracing::debug!("stale daemon exited after version-mismatch shutdown");
530            return Ok(());
531        }
532        thread::sleep(Duration::from_millis(sleep_ms));
533        sleep_ms = (sleep_ms * 2).min(500);
534    }
535
536    tracing::warn!(
537        timeout_ms = DAEMON_VERSION_RESTART_WAIT_MS,
538        "timed out waiting for stale daemon to exit after version-mismatch shutdown"
539    );
540    Ok(())
541}
542
543fn request_or_autostart(
544    models_dir: &Path,
545    request: &DaemonRequest,
546    cli_autostart: bool,
547) -> Result<Option<DaemonResponse>, AppError> {
548    if DAEMON_VERSION_STATE.load(Ordering::SeqCst) == VERSION_NOT_CHECKED {
549        maybe_restart_for_version_mismatch(models_dir)?;
550    }
551
552    if let Some(response) = request_if_available(models_dir, request)? {
553        clear_spawn_backoff_state(models_dir).ok();
554        return Ok(Some(response));
555    }
556
557    if !should_autostart(cli_autostart) {
558        return Ok(None);
559    }
560
561    if !ensure_daemon_running(models_dir)? {
562        return Ok(None);
563    }
564
565    request_if_available(models_dir, request)
566}
567
568fn ensure_daemon_running(models_dir: &Path) -> Result<bool, AppError> {
569    if (try_ping(models_dir)?).is_some() {
570        clear_spawn_backoff_state(models_dir).ok();
571        return Ok(true);
572    }
573
574    if spawn_backoff_active(models_dir)? {
575        tracing::warn!("daemon autostart suppressed by backoff window");
576        return Ok(false);
577    }
578
579    let spawn_lock = match try_acquire_spawn_lock(models_dir)? {
580        Some(lock) => lock,
581        None => return wait_for_daemon_ready(models_dir),
582    };
583
584    if (try_ping(models_dir)?).is_some() {
585        clear_spawn_backoff_state(models_dir).ok();
586        drop(spawn_lock);
587        return Ok(true);
588    }
589
590    let exe = match std::env::current_exe() {
591        Ok(path) => path,
592        Err(err) => {
593            record_spawn_failure(models_dir, format!("current_exe failed: {err}"))?;
594            drop(spawn_lock);
595            return Ok(false);
596        }
597    };
598
599    let mut child = std::process::Command::new(exe);
600    child
601        .arg("daemon")
602        .arg("--idle-shutdown-secs")
603        .arg(DAEMON_IDLE_SHUTDOWN_SECS.to_string())
604        .env("SQLITE_GRAPHRAG_DAEMON_CHILD", "1")
605        .env_remove("LD_PRELOAD")
606        .env_remove("LD_LIBRARY_PATH")
607        .env_remove("LD_AUDIT")
608        .env_remove("DYLD_INSERT_LIBRARIES")
609        .env_remove("DYLD_LIBRARY_PATH")
610        .stdin(Stdio::null())
611        .stdout(Stdio::null())
612        .stderr(Stdio::null());
613
614    match child.spawn() {
615        Ok(child_handle) => {
616            // SAFETY: deliberate orphan daemon detach. The Child handle is intentionally
617            // dropped without a corresponding `.wait()` call because the daemon owns its
618            // own lifecycle: `Stdio::null()` is set on stdin/stdout/stderr (above) so the
619            // child does not inherit terminal handles, the spawn lock file at
620            // `<models_dir>/.daemon.spawn.lock` prevents concurrent spawns, and the
621            // daemon shuts itself down via `DAEMON_IDLE_SHUTDOWN_SECS` (or an explicit
622            // `daemon stop`/SIGTERM). Keeping the handle here would block the parent
623            // CLI in the foreground until the daemon exited, defeating the autostart
624            // contract that callers expect.
625            // See: docs_rules/rules_rust_processos_externos.md section "Child detach justificado"
626            //      AND docs/adr/0001-daemon-warmup-exception.md (authorized exception to no-daemon rule)
627            let pid = child_handle.id();
628            drop(child_handle);
629            tracing::debug!(
630                pid,
631                "daemon detached; lifecycle managed via spawn lock + readiness file"
632            );
633            let ready = wait_for_daemon_ready(models_dir)?;
634            if ready {
635                clear_spawn_backoff_state(models_dir).ok();
636            } else {
637                record_spawn_failure(
638                    models_dir,
639                    "daemon did not become healthy after autostart".to_string(),
640                )?;
641            }
642            drop(spawn_lock);
643            Ok(ready)
644        }
645        Err(err) => {
646            record_spawn_failure(models_dir, format!("daemon spawn failed: {err}"))?;
647            drop(spawn_lock);
648            Ok(false)
649        }
650    }
651}
652
653fn wait_for_daemon_ready(models_dir: &Path) -> Result<bool, AppError> {
654    let deadline = Instant::now() + Duration::from_millis(DAEMON_AUTO_START_MAX_WAIT_MS);
655    let mut sleep_ms = DAEMON_AUTO_START_INITIAL_BACKOFF_MS.max(DAEMON_PING_TIMEOUT_MS);
656
657    while Instant::now() < deadline {
658        if (try_ping(models_dir)?).is_some() {
659            return Ok(true);
660        }
661        thread::sleep(Duration::from_millis(sleep_ms));
662        sleep_ms = (sleep_ms * 2).min(DAEMON_AUTO_START_MAX_BACKOFF_MS);
663    }
664
665    Ok(false)
666}
667
668fn autostart_disabled_by_env() -> bool {
669    std::env::var("SQLITE_GRAPHRAG_DAEMON_CHILD").as_deref() == Ok("1")
670        || std::env::var("SQLITE_GRAPHRAG_DAEMON_FORCE_AUTOSTART").as_deref() != Ok("1")
671            && std::env::var("SQLITE_GRAPHRAG_DAEMON_DISABLE_AUTOSTART").as_deref() == Ok("1")
672}
673
674fn daemon_control_dir(models_dir: &Path) -> PathBuf {
675    models_dir
676        .parent()
677        .map(Path::to_path_buf)
678        .unwrap_or_else(|| models_dir.to_path_buf())
679}
680
681fn spawn_lock_path(models_dir: &Path) -> PathBuf {
682    daemon_control_dir(models_dir).join("daemon-spawn.lock")
683}
684
685fn spawn_state_path(models_dir: &Path) -> PathBuf {
686    daemon_control_dir(models_dir).join("daemon-spawn-state.json")
687}
688
689fn try_acquire_spawn_lock(models_dir: &Path) -> Result<Option<File>, AppError> {
690    let path = spawn_lock_path(models_dir);
691    std::fs::create_dir_all(crate::paths::parent_or_err(&path)?).map_err(AppError::Io)?;
692    let file = OpenOptions::new()
693        .read(true)
694        .write(true)
695        .create(true)
696        .truncate(false)
697        .open(path)
698        .map_err(AppError::Io)?;
699
700    let deadline = Instant::now() + Duration::from_millis(DAEMON_SPAWN_LOCK_WAIT_MS);
701    loop {
702        match file.try_lock_exclusive() {
703            Ok(()) => return Ok(Some(file)),
704            Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
705                if Instant::now() >= deadline {
706                    return Ok(None);
707                }
708                thread::sleep(Duration::from_millis(50));
709            }
710            Err(err) => return Err(AppError::Io(err)),
711        }
712    }
713}
714
715fn spawn_backoff_active(models_dir: &Path) -> Result<bool, AppError> {
716    let state = load_spawn_state(models_dir)?;
717    Ok(now_epoch_ms() < state.not_before_epoch_ms)
718}
719
720fn record_spawn_failure(models_dir: &Path, message: String) -> Result<(), AppError> {
721    let mut state = load_spawn_state(models_dir)?;
722    state.consecutive_failures = state.consecutive_failures.saturating_add(1);
723    let exponent = state.consecutive_failures.saturating_sub(1).min(6);
724    let base_ms =
725        (DAEMON_SPAWN_BACKOFF_BASE_MS * (1_u64 << exponent)).min(DAEMON_AUTO_START_MAX_BACKOFF_MS);
726    // v1.0.36 (L2) + v1.0.43 (H7): half-jitter via fastrand (replaces SystemTime nanoseconds
727    // which violated rules_rust_retry_com_backoff.md). Effective backoff range: [base/2, base).
728    let half = base_ms / 2;
729    let jitter = if half == 0 { 0 } else { fastrand::u64(0..half) };
730    let backoff_ms = half + jitter;
731    state.not_before_epoch_ms = now_epoch_ms() + backoff_ms;
732    state.last_error = Some(message);
733    save_spawn_state(models_dir, &state)
734}
735
736fn clear_spawn_backoff_state(models_dir: &Path) -> Result<(), AppError> {
737    let path = spawn_state_path(models_dir);
738    if path.exists() {
739        std::fs::remove_file(path).map_err(AppError::Io)?;
740    }
741    Ok(())
742}
743
744fn load_spawn_state(models_dir: &Path) -> Result<DaemonSpawnState, AppError> {
745    let path = spawn_state_path(models_dir);
746    if !path.exists() {
747        return Ok(DaemonSpawnState::default());
748    }
749
750    let bytes = std::fs::read(path).map_err(AppError::Io)?;
751    serde_json::from_slice(&bytes).map_err(AppError::Json)
752}
753
754fn save_spawn_state(models_dir: &Path, state: &DaemonSpawnState) -> Result<(), AppError> {
755    let path = spawn_state_path(models_dir);
756    std::fs::create_dir_all(crate::paths::parent_or_err(&path)?).map_err(AppError::Io)?;
757    let bytes = serde_json::to_vec(state).map_err(AppError::Json)?;
758    std::fs::write(path, bytes).map_err(AppError::Io)
759}
760
761/// Returns the GLiNER model variant string based on the environment variable
762/// `SQLITE_GRAPHRAG_GLINER_VARIANT`, defaulting to `"fp32"`.
763fn gliner_variant_from_env() -> String {
764    std::env::var("SQLITE_GRAPHRAG_GLINER_VARIANT").unwrap_or_else(|_| "fp32".to_string())
765}
766
767fn now_epoch_ms() -> u64 {
768    SystemTime::now()
769        .duration_since(UNIX_EPOCH)
770        .unwrap_or_else(|_| Duration::from_secs(0))
771        .as_millis() as u64
772}
773
774fn to_local_socket_name(name: &str) -> std::io::Result<interprocess::local_socket::Name<'static>> {
775    if let Ok(ns_name) = name.to_string().to_ns_name::<GenericNamespaced>() {
776        return Ok(ns_name);
777    }
778
779    // Fallback when abstract namespaces are unavailable. Honours XDG_RUNTIME_DIR
780    // (Linux user-private runtime dir) or SQLITE_GRAPHRAG_HOME (project override)
781    // before falling back to /tmp, which can collide when the same name is used
782    // by another user/project on a multi-tenant host. Added in v1.0.35.
783    let path = if cfg!(unix) {
784        let base = std::env::var_os("XDG_RUNTIME_DIR")
785            .or_else(|| std::env::var_os("SQLITE_GRAPHRAG_HOME"))
786            .map(std::path::PathBuf::from)
787            .unwrap_or_else(std::env::temp_dir);
788        base.join(format!("{name}.sock"))
789            .to_string_lossy()
790            .into_owned()
791    } else {
792        format!(r"\\.\pipe\{name}")
793    };
794    path.to_fs_name::<GenericFilePath>()
795}
796
797#[cfg(test)]
798mod tests {
799    use super::*;
800
801    #[test]
802    fn record_and_clear_spawn_backoff_state() {
803        let tmp = tempfile::tempdir().unwrap();
804        let models_dir = tmp.path().join("cache").join("models");
805        std::fs::create_dir_all(&models_dir).unwrap();
806
807        assert!(!spawn_backoff_active(&models_dir).unwrap());
808
809        record_spawn_failure(&models_dir, "spawn failed".to_string()).unwrap();
810        assert!(spawn_backoff_active(&models_dir).unwrap());
811
812        let state = load_spawn_state(&models_dir).unwrap();
813        assert_eq!(state.consecutive_failures, 1);
814        assert_eq!(state.last_error.as_deref(), Some("spawn failed"));
815
816        clear_spawn_backoff_state(&models_dir).unwrap();
817        assert!(!spawn_backoff_active(&models_dir).unwrap());
818    }
819
820    #[test]
821    fn daemon_control_dir_uses_models_parent() {
822        let base = PathBuf::from("/tmp/sqlite-graphrag-cache-test");
823        let models_dir = base.join("models");
824        assert_eq!(daemon_control_dir(&models_dir), base);
825    }
826
827    #[test]
828    fn version_state_constants_are_distinct() {
829        assert_ne!(VERSION_NOT_CHECKED, VERSION_COMPATIBLE);
830        assert_ne!(VERSION_NOT_CHECKED, VERSION_RESTART_ATTEMPTED);
831        assert_ne!(VERSION_COMPATIBLE, VERSION_RESTART_ATTEMPTED);
832    }
833
834    #[test]
835    fn wait_for_daemon_exit_immediate_when_not_running() {
836        let tmp = tempfile::tempdir().unwrap();
837        let models_dir = tmp.path().join("cache").join("models");
838        std::fs::create_dir_all(&models_dir).unwrap();
839
840        let start = Instant::now();
841        wait_for_daemon_exit(&models_dir).unwrap();
842        // Without a daemon, the first ping returns None and the function exits immediately.
843        assert!(start.elapsed() < Duration::from_millis(500));
844    }
845
846    #[test]
847    fn spawn_backoff_exponent_caps_at_six() {
848        let tmp = tempfile::tempdir().unwrap();
849        let models_dir = tmp.path().join("cache").join("models");
850        std::fs::create_dir_all(&models_dir).unwrap();
851
852        // Record 10 consecutive failures to force exponent saturation.
853        for i in 0..10 {
854            record_spawn_failure(&models_dir, format!("failure {i}")).unwrap();
855        }
856
857        let state = load_spawn_state(&models_dir).unwrap();
858        assert_eq!(state.consecutive_failures, 10);
859
860        // Exponent is clamped at 6, so max base_ms is base * 2^6.
861        // Effective backoff range is [base/2, base), where base <= base_ms * 64.
862        let max_base =
863            (DAEMON_SPAWN_BACKOFF_BASE_MS * (1_u64 << 6)).min(DAEMON_AUTO_START_MAX_BACKOFF_MS);
864        // The not_before_epoch_ms must not exceed now + max_base (upper bound with jitter < half).
865        let now = now_epoch_ms();
866        assert!(state.not_before_epoch_ms <= now + max_base);
867    }
868
869    #[test]
870    fn spawn_backoff_half_jitter_in_range() {
871        // Verify the half-jitter formula: result = half + fastrand::u64(0..half)
872        // produces values in [half, half + half) == [base/2, base).
873        let base_ms: u64 = 100;
874        let half = base_ms / 2;
875        for _ in 0..100 {
876            let jitter = fastrand::u64(0..half);
877            let result = half + jitter;
878            assert!(result >= half, "result {result} below half {half}");
879            assert!(result < base_ms, "result {result} not below base {base_ms}");
880        }
881    }
882
883    #[test]
884    fn to_local_socket_name_produces_valid_result() {
885        let result = to_local_socket_name("sqlite-graphrag-test-daemon");
886        assert!(result.is_ok(), "expected Ok, got {result:?}");
887        // The name string representation must be non-empty.
888        let name = result.unwrap();
889        let display = format!("{name:?}");
890        assert!(!display.is_empty());
891    }
892
893    #[test]
894    fn version_cas_not_checked_to_compatible() {
895        let state = AtomicU8::new(VERSION_NOT_CHECKED);
896        let result = state.compare_exchange(
897            VERSION_NOT_CHECKED,
898            VERSION_COMPATIBLE,
899            Ordering::SeqCst,
900            Ordering::SeqCst,
901        );
902        assert!(result.is_ok());
903        assert_eq!(state.load(Ordering::SeqCst), VERSION_COMPATIBLE);
904    }
905
906    #[test]
907    fn version_cas_prevents_double_restart() {
908        let state = AtomicU8::new(VERSION_NOT_CHECKED);
909
910        // First CAS: NOT_CHECKED → RESTART_ATTEMPTED succeeds.
911        let first = state.compare_exchange(
912            VERSION_NOT_CHECKED,
913            VERSION_RESTART_ATTEMPTED,
914            Ordering::SeqCst,
915            Ordering::SeqCst,
916        );
917        assert!(first.is_ok());
918
919        // Second CAS from NOT_CHECKED must fail — state is already RESTART_ATTEMPTED.
920        let second = state.compare_exchange(
921            VERSION_NOT_CHECKED,
922            VERSION_RESTART_ATTEMPTED,
923            Ordering::SeqCst,
924            Ordering::SeqCst,
925        );
926        assert!(second.is_err());
927        assert_eq!(state.load(Ordering::SeqCst), VERSION_RESTART_ATTEMPTED);
928    }
929
930    #[test]
931    fn ping_response_includes_model_fields() {
932        let resp = DaemonResponse::Ok {
933            pid: 42,
934            version: "1.0.0".to_string(),
935            handled_embed_requests: 7,
936            model_name: "multilingual-e5-small".to_string(),
937            model_variant: "fp32".to_string(),
938        };
939        let json = serde_json::to_value(&resp).expect("serialization failed");
940        assert_eq!(json["model_name"], "multilingual-e5-small");
941        assert_eq!(json["model_variant"], "fp32");
942        assert_eq!(json["status"], "ok");
943        assert_eq!(json["handled_embed_requests"], 7u64);
944    }
945
946    #[test]
947    fn gliner_variant_defaults_to_fp32() {
948        // Ensure the default is fp32 when env var is not set.
949        std::env::remove_var("SQLITE_GRAPHRAG_GLINER_VARIANT");
950        let variant = gliner_variant_from_env();
951        assert_eq!(variant, "fp32");
952    }
953
954    #[test]
955    fn gliner_variant_reads_env_var() {
956        std::env::set_var("SQLITE_GRAPHRAG_GLINER_VARIANT", "int8");
957        let variant = gliner_variant_from_env();
958        std::env::remove_var("SQLITE_GRAPHRAG_GLINER_VARIANT");
959        assert_eq!(variant, "int8");
960    }
961
962    #[test]
963    fn spawn_state_serialization_roundtrip() {
964        let tmp = tempfile::tempdir().unwrap();
965        let models_dir = tmp.path().join("cache").join("models");
966        std::fs::create_dir_all(&models_dir).unwrap();
967
968        let original = DaemonSpawnState {
969            consecutive_failures: 3,
970            not_before_epoch_ms: 9_999_999_999,
971            last_error: Some("test error message".to_string()),
972        };
973        save_spawn_state(&models_dir, &original).unwrap();
974
975        let loaded = load_spawn_state(&models_dir).unwrap();
976        assert_eq!(loaded.consecutive_failures, original.consecutive_failures);
977        assert_eq!(loaded.not_before_epoch_ms, original.not_before_epoch_ms);
978        assert_eq!(loaded.last_error, original.last_error);
979    }
980}