Skip to main content

sqlite_graphrag/
daemon.rs

1//! IPC daemon: keeps the embedding model warm across CLI invocations.
2//!
3//! Manages the background process lifecycle, Unix-socket IPC protocol, and
4//! auto-start/backoff logic so embeddings are served without cold-start cost.
5
6use crate::constants::{
7    DAEMON_AUTO_START_INITIAL_BACKOFF_MS, DAEMON_AUTO_START_MAX_BACKOFF_MS,
8    DAEMON_AUTO_START_MAX_WAIT_MS, DAEMON_IDLE_SHUTDOWN_SECS, DAEMON_PING_TIMEOUT_MS,
9    DAEMON_SPAWN_BACKOFF_BASE_MS, DAEMON_SPAWN_LOCK_WAIT_MS, DAEMON_VERSION_RESTART_WAIT_MS,
10    SQLITE_GRAPHRAG_VERSION,
11};
12use crate::errors::AppError;
13use crate::{embedder, shutdown_requested};
14use fs4::fs_std::FileExt;
15use interprocess::local_socket::{
16    prelude::LocalSocketStream,
17    traits::{Listener as _, Stream as _},
18    GenericFilePath, GenericNamespaced, ListenerNonblockingMode, ListenerOptions, ToFsName,
19    ToNsName,
20};
21use serde::{Deserialize, Serialize};
22use std::fs::{File, OpenOptions};
23use std::io::{BufRead, BufReader, Write};
24use std::path::{Path, PathBuf};
25use std::process::Stdio;
26use std::sync::atomic::{AtomicU64, AtomicU8, Ordering};
27use std::sync::Arc;
28use std::thread;
29use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
30
31const VERSION_NOT_CHECKED: u8 = 0;
32const VERSION_COMPATIBLE: u8 = 1;
33const VERSION_RESTART_ATTEMPTED: u8 = 2;
34
35/// Guards against restart loops: tracks version check state per process lifetime.
36static DAEMON_VERSION_STATE: AtomicU8 = AtomicU8::new(VERSION_NOT_CHECKED);
37
38#[derive(Debug, Serialize, Deserialize)]
39#[serde(tag = "request", rename_all = "snake_case")]
40pub enum DaemonRequest {
41    Ping,
42    Shutdown,
43    EmbedPassage {
44        text: String,
45    },
46    EmbedQuery {
47        text: String,
48    },
49    EmbedPassages {
50        texts: Vec<String>,
51        token_counts: Vec<usize>,
52    },
53}
54
55#[derive(Debug, Serialize, Deserialize)]
56#[serde(tag = "status", rename_all = "snake_case")]
57pub enum DaemonResponse {
58    Listening {
59        pid: u32,
60        socket: String,
61        idle_shutdown_secs: u64,
62    },
63    Ok {
64        pid: u32,
65        version: String,
66        handled_embed_requests: u64,
67        model_name: String,
68        model_variant: String,
69    },
70    PassageEmbedding {
71        embedding: Vec<f32>,
72        handled_embed_requests: u64,
73    },
74    QueryEmbedding {
75        embedding: Vec<f32>,
76        handled_embed_requests: u64,
77    },
78    PassageEmbeddings {
79        embeddings: Vec<Vec<f32>>,
80        handled_embed_requests: u64,
81    },
82    ShuttingDown {
83        handled_embed_requests: u64,
84    },
85    Error {
86        message: String,
87    },
88}
89
90#[derive(Debug, Default, Serialize, Deserialize)]
91struct DaemonSpawnState {
92    consecutive_failures: u32,
93    not_before_epoch_ms: u64,
94    last_error: Option<String>,
95}
96
97pub fn daemon_label(models_dir: &Path) -> String {
98    let hash = blake3::hash(models_dir.to_string_lossy().as_bytes())
99        .to_hex()
100        .to_string();
101    format!("sqlite-graphrag-daemon-{}", &hash[..16])
102}
103
104pub fn try_ping(models_dir: &Path) -> Result<Option<DaemonResponse>, AppError> {
105    request_if_available(models_dir, &DaemonRequest::Ping)
106}
107
108pub fn try_shutdown(models_dir: &Path) -> Result<Option<DaemonResponse>, AppError> {
109    request_if_available(models_dir, &DaemonRequest::Shutdown)
110}
111
112pub fn embed_passage_or_local(models_dir: &Path, text: &str) -> Result<Vec<f32>, AppError> {
113    match request_or_autostart(
114        models_dir,
115        &DaemonRequest::EmbedPassage {
116            text: text.to_string(),
117        },
118        true,
119    )? {
120        Some(DaemonResponse::PassageEmbedding { embedding, .. }) => Ok(embedding),
121        Some(DaemonResponse::Error { message }) => Err(AppError::Embedding(message)),
122        Some(other) => Err(AppError::Internal(anyhow::anyhow!(
123            "unexpected daemon response for passage embedding: {other:?}"
124        ))),
125        None => {
126            let embedder = embedder::get_embedder(models_dir)?;
127            embedder::embed_passage(embedder, text)
128        }
129    }
130}
131
132pub fn embed_query_or_local(
133    models_dir: &Path,
134    text: &str,
135    cli_autostart: bool,
136) -> Result<Vec<f32>, AppError> {
137    match request_or_autostart(
138        models_dir,
139        &DaemonRequest::EmbedQuery {
140            text: text.to_string(),
141        },
142        cli_autostart,
143    )? {
144        Some(DaemonResponse::QueryEmbedding { embedding, .. }) => Ok(embedding),
145        Some(DaemonResponse::Error { message }) => Err(AppError::Embedding(message)),
146        Some(other) => Err(AppError::Internal(anyhow::anyhow!(
147            "unexpected daemon response for query embedding: {other:?}"
148        ))),
149        None => {
150            let embedder = embedder::get_embedder(models_dir)?;
151            embedder::embed_query(embedder, text)
152        }
153    }
154}
155
156pub fn embed_passages_controlled_or_local(
157    models_dir: &Path,
158    texts: &[&str],
159    token_counts: &[usize],
160) -> Result<Vec<Vec<f32>>, AppError> {
161    let request = DaemonRequest::EmbedPassages {
162        texts: texts.iter().map(|t| (*t).to_string()).collect(),
163        token_counts: token_counts.to_vec(),
164    };
165
166    match request_or_autostart(models_dir, &request, true)? {
167        Some(DaemonResponse::PassageEmbeddings { embeddings, .. }) => Ok(embeddings),
168        Some(DaemonResponse::Error { message }) => Err(AppError::Embedding(message)),
169        Some(other) => Err(AppError::Internal(anyhow::anyhow!(
170            "unexpected daemon response for passage embedding batch: {other:?}"
171        ))),
172        None => {
173            let embedder = embedder::get_embedder(models_dir)?;
174            embedder::embed_passages_controlled(embedder, texts, token_counts)
175        }
176    }
177}
178
179struct DaemonSpawnGuard {
180    models_dir: PathBuf,
181}
182
183impl DaemonSpawnGuard {
184    fn new(models_dir: &Path) -> Self {
185        Self {
186            models_dir: models_dir.to_path_buf(),
187        }
188    }
189}
190
191impl Drop for DaemonSpawnGuard {
192    fn drop(&mut self) {
193        let lock_path = spawn_lock_path(&self.models_dir);
194        if lock_path.exists() {
195            match std::fs::remove_file(&lock_path) {
196                Ok(()) => {
197                    tracing::debug!(
198                        target: "daemon",
199                        path = %lock_path.display(),
200                        "spawn lock file removed during graceful daemon shutdown"
201                    );
202                }
203                Err(err) if err.kind() == std::io::ErrorKind::NotFound => {}
204                Err(err) => {
205                    tracing::warn!(
206                        target: "daemon",
207                        error = %err,
208                        path = %lock_path.display(),
209                        "failed to remove spawn lock file while shutting down daemon"
210                    );
211                }
212            }
213        }
214        let pid_path = pid_file_path(&self.models_dir);
215        let _ = std::fs::remove_file(&pid_path);
216
217        tracing::info!(
218            target: "daemon",
219            "daemon shut down gracefully; socket will be cleaned up by OS or by the next daemon via try_overwrite"
220        );
221    }
222}
223
224pub fn run(
225    models_dir: &Path,
226    idle_shutdown_secs: u64,
227    shutdown_timeout_secs: u64,
228) -> Result<(), AppError> {
229    // Scale worker threads to available parallelism so embedding tasks saturate CPU cores.
230    // Clamped to [2, 8] to avoid excessive threads on high-core machines.
231    let permits = std::thread::available_parallelism()
232        .map(|n| n.get())
233        .unwrap_or(2)
234        .clamp(2, 8);
235    let rt = tokio::runtime::Builder::new_multi_thread()
236        .worker_threads(permits)
237        .thread_name("daemon-worker")
238        .enable_all()
239        .build()
240        .map_err(AppError::Io)?;
241
242    let result = rt.block_on(run_async(models_dir, idle_shutdown_secs, permits));
243    rt.shutdown_timeout(std::time::Duration::from_secs(shutdown_timeout_secs));
244    result
245}
246
247#[tracing::instrument(skip_all, fields(idle_secs = idle_shutdown_secs, permits))]
248async fn run_async(
249    models_dir: &Path,
250    idle_shutdown_secs: u64,
251    permits: usize,
252) -> Result<(), AppError> {
253    let socket = daemon_label(models_dir);
254    let name = to_local_socket_name(&socket)?;
255    let listener = ListenerOptions::new()
256        .name(name)
257        .nonblocking(ListenerNonblockingMode::Accept)
258        .try_overwrite(true)
259        .create_sync()
260        .map_err(AppError::Io)?;
261
262    // Guard that cleans up the spawn lock file on graceful shutdown.
263    // SIGKILL does not trigger Drop; in that case try_overwrite(true) above is the fallback.
264    let _spawn_guard = DaemonSpawnGuard::new(models_dir);
265
266    // Warm the model once per daemon process inside spawn_blocking so the
267    // ONNX session initialisation (CPU-bound, may take several seconds) does
268    // not block a tokio worker thread.
269    let models_dir_warm = models_dir.to_path_buf();
270    tokio::task::spawn_blocking(move || embedder::get_embedder(&models_dir_warm).map(|_| ()))
271        .await
272        .map_err(|e| AppError::Internal(anyhow::anyhow!("model warm-up panicked: {e}")))??;
273
274    let pid_path = pid_file_path(models_dir);
275    let _ = std::fs::write(&pid_path, std::process::id().to_string());
276
277    crate::output::emit_json(&DaemonResponse::Listening {
278        pid: std::process::id(),
279        socket,
280        idle_shutdown_secs,
281    })?;
282
283    let handled_embed_requests = Arc::new(AtomicU64::new(0));
284    let mut last_activity = Instant::now();
285    let models_dir = models_dir.to_path_buf();
286    // Bound concurrent spawn_blocking tasks to the same thread count as the runtime.
287    let permit_pool = Arc::new(tokio::sync::Semaphore::new(permits));
288
289    let token = crate::cancel_token();
290    loop {
291        if shutdown_requested() || token.is_cancelled() {
292            break;
293        }
294
295        if !daemon_control_dir(&models_dir).exists() {
296            tracing::info!(target: "daemon", "daemon control directory disappeared; shutting down");
297            break;
298        }
299
300        match listener.accept() {
301            Ok(stream) => {
302                last_activity = Instant::now();
303                let models_dir_clone = models_dir.clone();
304                let counter = Arc::clone(&handled_embed_requests);
305                let permit =
306                    permit_pool.clone().acquire_owned().await.map_err(|e| {
307                        AppError::Internal(anyhow::anyhow!("semaphore closed: {e}"))
308                    })?;
309                let should_exit = tokio::task::spawn_blocking(move || {
310                    let _permit = permit; // hold until end of scope
311                    handle_client(stream, &models_dir_clone, &counter)
312                })
313                .await
314                .map_err(|e| {
315                    AppError::Internal(anyhow::anyhow!("spawn_blocking panicked: {e}"))
316                })??;
317
318                if should_exit {
319                    break;
320                }
321            }
322            Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
323                if last_activity.elapsed() >= Duration::from_secs(idle_shutdown_secs) {
324                    tracing::info!(
325                        target: "daemon",
326                        idle_shutdown_secs,
327                        handled_embed_requests = handled_embed_requests.load(Ordering::Relaxed),
328                        "daemon idle timeout reached"
329                    );
330                    break;
331                }
332                tokio::select! {
333                    () = tokio::time::sleep(Duration::from_millis(50)) => {}
334                    () = token.cancelled() => { break; }
335                }
336            }
337            Err(err) => return Err(AppError::Io(err)),
338        }
339    }
340
341    Ok(())
342}
343
344fn handle_client(
345    stream: LocalSocketStream,
346    models_dir: &Path,
347    handled_embed_requests: &AtomicU64,
348) -> Result<bool, AppError> {
349    let mut reader = BufReader::new(stream);
350    let mut line = String::new();
351    reader.read_line(&mut line).map_err(AppError::Io)?;
352
353    if line.trim().is_empty() {
354        write_response(
355            reader.get_mut(),
356            &DaemonResponse::Error {
357                message: "empty request to daemon".to_string(),
358            },
359        )?;
360        return Ok(false);
361    }
362
363    let request: DaemonRequest = serde_json::from_str(line.trim()).map_err(AppError::Json)?;
364    let (response, should_exit) = match request {
365        DaemonRequest::Ping => (
366            DaemonResponse::Ok {
367                pid: std::process::id(),
368                version: SQLITE_GRAPHRAG_VERSION.to_string(),
369                handled_embed_requests: handled_embed_requests.load(Ordering::Relaxed),
370                model_name: crate::constants::FASTEMBED_MODEL_DEFAULT.to_string(),
371                model_variant: gliner_variant_from_env(),
372            },
373            false,
374        ),
375        DaemonRequest::Shutdown => (
376            DaemonResponse::ShuttingDown {
377                handled_embed_requests: handled_embed_requests.load(Ordering::Relaxed),
378            },
379            true,
380        ),
381        DaemonRequest::EmbedPassage { text } => {
382            let embedder = embedder::get_embedder(models_dir)?;
383            let embedding = embedder::embed_passage(embedder, &text)?;
384            let count = handled_embed_requests.fetch_add(1, Ordering::Relaxed) + 1;
385            (
386                DaemonResponse::PassageEmbedding {
387                    embedding,
388                    handled_embed_requests: count,
389                },
390                false,
391            )
392        }
393        DaemonRequest::EmbedQuery { text } => {
394            let embedder = embedder::get_embedder(models_dir)?;
395            let embedding = embedder::embed_query(embedder, &text)?;
396            let count = handled_embed_requests.fetch_add(1, Ordering::Relaxed) + 1;
397            (
398                DaemonResponse::QueryEmbedding {
399                    embedding,
400                    handled_embed_requests: count,
401                },
402                false,
403            )
404        }
405        DaemonRequest::EmbedPassages {
406            texts,
407            token_counts,
408        } => {
409            let embedder = embedder::get_embedder(models_dir)?;
410            let text_refs: Vec<&str> = texts.iter().map(String::as_str).collect();
411            let embeddings =
412                embedder::embed_passages_controlled(embedder, &text_refs, &token_counts)?;
413            let count = handled_embed_requests.fetch_add(1, Ordering::Relaxed) + 1;
414            (
415                DaemonResponse::PassageEmbeddings {
416                    embeddings,
417                    handled_embed_requests: count,
418                },
419                false,
420            )
421        }
422    };
423
424    write_response(reader.get_mut(), &response)?;
425    Ok(should_exit)
426}
427
428fn write_response(
429    stream: &mut LocalSocketStream,
430    response: &DaemonResponse,
431) -> Result<(), AppError> {
432    serde_json::to_writer(&mut *stream, response).map_err(AppError::Json)?;
433    stream.write_all(b"\n").map_err(AppError::Io)?;
434    stream.flush().map_err(AppError::Io)?;
435    Ok(())
436}
437
438fn request_if_available(
439    models_dir: &Path,
440    request: &DaemonRequest,
441) -> Result<Option<DaemonResponse>, AppError> {
442    let socket = daemon_label(models_dir);
443    let name = match to_local_socket_name(&socket) {
444        Ok(name) => name,
445        Err(err) => return Err(AppError::Io(err)),
446    };
447
448    let mut stream = match LocalSocketStream::connect(name) {
449        Ok(stream) => stream,
450        Err(err)
451            if matches!(
452                err.kind(),
453                std::io::ErrorKind::NotFound
454                    | std::io::ErrorKind::ConnectionRefused
455                    | std::io::ErrorKind::AddrNotAvailable
456                    | std::io::ErrorKind::TimedOut
457            ) =>
458        {
459            return Ok(None);
460        }
461        Err(err) => return Err(AppError::Io(err)),
462    };
463
464    serde_json::to_writer(&mut stream, request).map_err(AppError::Json)?;
465    stream.write_all(b"\n").map_err(AppError::Io)?;
466    stream.flush().map_err(AppError::Io)?;
467
468    let mut reader = BufReader::new(stream);
469    let mut line = String::new();
470    reader.read_line(&mut line).map_err(AppError::Io)?;
471    if line.trim().is_empty() {
472        return Err(AppError::Embedding(
473            "daemon returned an empty response".into(),
474        ));
475    }
476
477    let response = serde_json::from_str(line.trim()).map_err(AppError::Json)?;
478    Ok(Some(response))
479}
480
481fn should_autostart(cli_flag: bool) -> bool {
482    if !cli_flag {
483        return false; // explicit CLI override wins
484    }
485    !autostart_disabled_by_env()
486}
487
488/// Checks whether a running daemon has a different version from the current CLI binary.
489/// If a mismatch is detected, shuts down the stale daemon, waits for it to exit, and
490/// re-spawns a fresh one. The `VERSION_RESTART_ATTEMPTED` state prevents infinite loops:
491/// this function is a no-op after the first attempt regardless of outcome.
492fn maybe_restart_for_version_mismatch(models_dir: &Path) -> Result<(), AppError> {
493    // ORDERING: Acquire on success synchronizes-with the Release store at line ~505.
494    // Relaxed on failure: no dependent memory is read on the CAS failure path.
495    if DAEMON_VERSION_STATE
496        .compare_exchange(
497            VERSION_NOT_CHECKED,
498            VERSION_COMPATIBLE,
499            Ordering::Acquire,
500            Ordering::Relaxed,
501        )
502        .is_err()
503    {
504        // Already checked (compatible) or already attempted a restart — skip.
505        return Ok(());
506    }
507
508    let response = match try_ping(models_dir)? {
509        Some(r) => r,
510        None => return Ok(()), // no daemon running, nothing to check
511    };
512
513    let daemon_version = match &response {
514        DaemonResponse::Ok { version, .. } => version.as_str(),
515        _ => return Ok(()), // unexpected response shape, skip
516    };
517
518    if daemon_version == SQLITE_GRAPHRAG_VERSION {
519        return Ok(()); // versions match, state already set to COMPATIBLE
520    }
521
522    // Mismatch detected — mark as restart-attempted so we never loop.
523    // ORDERING: Release pairs with the Acquire in compare_exchange and load.
524    DAEMON_VERSION_STATE.store(VERSION_RESTART_ATTEMPTED, Ordering::Release);
525
526    tracing::warn!(
527        target: "daemon",
528        daemon_version = %daemon_version,
529        cli_version = SQLITE_GRAPHRAG_VERSION,
530        "daemon version mismatch detected; auto-restarting daemon"
531    );
532
533    // Send shutdown request.
534    try_shutdown(models_dir)?;
535
536    // Wait for the stale daemon to exit.
537    wait_for_daemon_exit(models_dir)?;
538
539    // Re-spawn the daemon via the existing mechanism.
540    ensure_daemon_running(models_dir)?;
541
542    Ok(())
543}
544
545/// Polls until the daemon stops responding to pings, with exponential backoff.
546/// Starts at 50 ms, doubles each iteration, caps at 500 ms per sleep.
547/// Returns `Ok(())` once the daemon is gone or the timeout is reached.
548#[cold]
549#[inline(never)]
550fn wait_for_daemon_exit(models_dir: &Path) -> Result<(), AppError> {
551    let deadline = Instant::now() + Duration::from_millis(DAEMON_VERSION_RESTART_WAIT_MS);
552    let mut sleep_ms: u64 = 50;
553
554    while Instant::now() < deadline {
555        if try_ping(models_dir)?.is_none() {
556            tracing::debug!(target: "daemon", "stale daemon exited after version-mismatch shutdown");
557            return Ok(());
558        }
559        thread::sleep(Duration::from_millis(sleep_ms));
560        sleep_ms = (sleep_ms * 2).min(500);
561    }
562
563    tracing::warn!(
564        target: "daemon",
565        timeout_ms = DAEMON_VERSION_RESTART_WAIT_MS,
566        "timed out waiting for stale daemon to exit after version-mismatch shutdown"
567    );
568    Ok(())
569}
570
571fn request_or_autostart(
572    models_dir: &Path,
573    request: &DaemonRequest,
574    cli_autostart: bool,
575) -> Result<Option<DaemonResponse>, AppError> {
576    // ORDERING: Acquire pairs with the Release store in maybe_restart_for_version_mismatch.
577    if DAEMON_VERSION_STATE.load(Ordering::Acquire) == VERSION_NOT_CHECKED {
578        maybe_restart_for_version_mismatch(models_dir)?;
579    }
580
581    if let Some(response) = request_if_available(models_dir, request)? {
582        clear_spawn_backoff_state(models_dir).ok();
583        return Ok(Some(response));
584    }
585
586    if !should_autostart(cli_autostart) {
587        return Ok(None);
588    }
589
590    if !ensure_daemon_running(models_dir)? {
591        return Ok(None);
592    }
593
594    request_if_available(models_dir, request)
595}
596
597fn ensure_daemon_running(models_dir: &Path) -> Result<bool, AppError> {
598    if (try_ping(models_dir)?).is_some() {
599        clear_spawn_backoff_state(models_dir).ok();
600        return Ok(true);
601    }
602
603    if spawn_backoff_active(models_dir)? {
604        tracing::warn!(target: "daemon", "daemon autostart suppressed by backoff window");
605        return Ok(false);
606    }
607
608    let spawn_lock = match try_acquire_spawn_lock(models_dir)? {
609        Some(lock) => lock,
610        None => return wait_for_daemon_ready(models_dir),
611    };
612
613    if (try_ping(models_dir)?).is_some() {
614        clear_spawn_backoff_state(models_dir).ok();
615        drop(spawn_lock);
616        return Ok(true);
617    }
618
619    let exe = match std::env::current_exe() {
620        Ok(path) => path,
621        Err(err) => {
622            record_spawn_failure(models_dir, &format!("current_exe failed: {err}"))?;
623            drop(spawn_lock);
624            return Ok(false);
625        }
626    };
627
628    let mut child = std::process::Command::new(exe);
629    child
630        .arg("daemon")
631        .arg("--idle-shutdown-secs")
632        .arg(DAEMON_IDLE_SHUTDOWN_SECS.to_string())
633        .env("SQLITE_GRAPHRAG_DAEMON_CHILD", "1")
634        .env_remove("LD_PRELOAD")
635        .env_remove("LD_LIBRARY_PATH")
636        .env_remove("LD_AUDIT")
637        .env_remove("DYLD_INSERT_LIBRARIES")
638        .env_remove("DYLD_LIBRARY_PATH")
639        .stdin(Stdio::null())
640        .stdout(Stdio::null())
641        .stderr(Stdio::null());
642
643    match crate::commands::claude_runner::spawn_with_memory_limit(&mut child) {
644        Ok(child_handle) => {
645            // SAFETY: deliberate orphan daemon detach. The Child handle is intentionally
646            // dropped without a corresponding `.wait()` call because the daemon owns its
647            // own lifecycle: `Stdio::null()` is set on stdin/stdout/stderr (above) so the
648            // child does not inherit terminal handles, the spawn lock file at
649            // `<models_dir>/.daemon.spawn.lock` prevents concurrent spawns, and the
650            // daemon shuts itself down via `DAEMON_IDLE_SHUTDOWN_SECS` (or an explicit
651            // `daemon stop`/SIGTERM). Keeping the handle here would block the parent
652            // CLI in the foreground until the daemon exited, defeating the autostart
653            // contract that callers expect.
654            // See: docs_rules/rules_rust_processos_externos.md section "Child detach justificado"
655            //      AND docs/adr/0001-daemon-warmup-exception.md (authorized exception to no-daemon rule)
656            let pid = child_handle.id();
657            drop(child_handle);
658            tracing::debug!(
659                target: "daemon",
660                pid,
661                "daemon detached; lifecycle managed via spawn lock + readiness file"
662            );
663            let ready = wait_for_daemon_ready(models_dir)?;
664            if ready {
665                clear_spawn_backoff_state(models_dir).ok();
666            } else {
667                record_spawn_failure(models_dir, "daemon did not become healthy after autostart")?;
668            }
669            drop(spawn_lock);
670            Ok(ready)
671        }
672        Err(err) => {
673            record_spawn_failure(models_dir, &format!("daemon spawn failed: {err}"))?;
674            drop(spawn_lock);
675            Ok(false)
676        }
677    }
678}
679
680fn wait_for_daemon_ready(models_dir: &Path) -> Result<bool, AppError> {
681    let deadline = Instant::now() + Duration::from_millis(DAEMON_AUTO_START_MAX_WAIT_MS);
682    let mut sleep_ms = DAEMON_AUTO_START_INITIAL_BACKOFF_MS.max(DAEMON_PING_TIMEOUT_MS);
683
684    while Instant::now() < deadline {
685        if (try_ping(models_dir)?).is_some() {
686            return Ok(true);
687        }
688        thread::sleep(Duration::from_millis(sleep_ms));
689        sleep_ms = (sleep_ms * 2).min(DAEMON_AUTO_START_MAX_BACKOFF_MS);
690    }
691
692    Ok(false)
693}
694
695fn autostart_disabled_by_env() -> bool {
696    std::env::var("SQLITE_GRAPHRAG_DAEMON_CHILD").as_deref() == Ok("1")
697        || std::env::var("SQLITE_GRAPHRAG_DAEMON_FORCE_AUTOSTART").as_deref() != Ok("1")
698            && std::env::var("SQLITE_GRAPHRAG_DAEMON_DISABLE_AUTOSTART").as_deref() == Ok("1")
699}
700
701fn daemon_control_dir(models_dir: &Path) -> PathBuf {
702    models_dir
703        .parent()
704        .map(Path::to_path_buf)
705        .unwrap_or_else(|| models_dir.to_path_buf())
706}
707
708fn spawn_lock_path(models_dir: &Path) -> PathBuf {
709    daemon_control_dir(models_dir).join("daemon-spawn.lock")
710}
711
712fn spawn_state_path(models_dir: &Path) -> PathBuf {
713    daemon_control_dir(models_dir).join("daemon-spawn-state.json")
714}
715
716fn pid_file_path(models_dir: &Path) -> PathBuf {
717    daemon_control_dir(models_dir).join("daemon.pid")
718}
719
720fn try_acquire_spawn_lock(models_dir: &Path) -> Result<Option<File>, AppError> {
721    let path = spawn_lock_path(models_dir);
722    std::fs::create_dir_all(crate::paths::parent_or_err(&path)?).map_err(AppError::Io)?;
723    let file = OpenOptions::new()
724        .read(true)
725        .write(true)
726        .create(true)
727        .truncate(false)
728        .open(path)
729        .map_err(AppError::Io)?;
730
731    let deadline = Instant::now() + Duration::from_millis(DAEMON_SPAWN_LOCK_WAIT_MS);
732    loop {
733        match file.try_lock_exclusive() {
734            Ok(()) => return Ok(Some(file)),
735            Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
736                if Instant::now() >= deadline {
737                    return Ok(None);
738                }
739                thread::sleep(Duration::from_millis(50));
740            }
741            Err(err) => return Err(AppError::Io(err)),
742        }
743    }
744}
745
746fn spawn_backoff_active(models_dir: &Path) -> Result<bool, AppError> {
747    let state = load_spawn_state(models_dir)?;
748    Ok(now_epoch_ms() < state.not_before_epoch_ms)
749}
750
751#[cold]
752#[inline(never)]
753fn record_spawn_failure(models_dir: &Path, message: &str) -> Result<(), AppError> {
754    let mut state = load_spawn_state(models_dir)?;
755    state.consecutive_failures = state.consecutive_failures.saturating_add(1);
756    let exponent = state.consecutive_failures.saturating_sub(1).min(6);
757    let base_ms =
758        (DAEMON_SPAWN_BACKOFF_BASE_MS * (1_u64 << exponent)).min(DAEMON_AUTO_START_MAX_BACKOFF_MS);
759    // v1.0.36 (L2) + v1.0.43 (H7): half-jitter via fastrand (replaces SystemTime nanoseconds
760    // which violated rules_rust_retry_com_backoff.md). Effective backoff range: [base/2, base).
761    let half = base_ms / 2;
762    let jitter = if half == 0 { 0 } else { fastrand::u64(0..half) };
763    let backoff_ms = half + jitter;
764    state.not_before_epoch_ms = now_epoch_ms() + backoff_ms;
765    state.last_error = Some(message.to_string());
766    save_spawn_state(models_dir, &state)
767}
768
769fn clear_spawn_backoff_state(models_dir: &Path) -> Result<(), AppError> {
770    let path = spawn_state_path(models_dir);
771    if path.exists() {
772        std::fs::remove_file(path).map_err(AppError::Io)?;
773    }
774    Ok(())
775}
776
777fn load_spawn_state(models_dir: &Path) -> Result<DaemonSpawnState, AppError> {
778    let path = spawn_state_path(models_dir);
779    if !path.exists() {
780        return Ok(DaemonSpawnState::default());
781    }
782
783    let bytes = std::fs::read(path).map_err(AppError::Io)?;
784    serde_json::from_slice(&bytes).map_err(AppError::Json)
785}
786
787fn save_spawn_state(models_dir: &Path, state: &DaemonSpawnState) -> Result<(), AppError> {
788    let path = spawn_state_path(models_dir);
789    std::fs::create_dir_all(crate::paths::parent_or_err(&path)?).map_err(AppError::Io)?;
790    let bytes = serde_json::to_vec(state).map_err(AppError::Json)?;
791    std::fs::write(path, bytes).map_err(AppError::Io)
792}
793
794/// Returns the GLiNER model variant string based on the environment variable
795/// `SQLITE_GRAPHRAG_GLINER_VARIANT`, defaulting to `"fp32"`.
796fn gliner_variant_from_env() -> String {
797    std::env::var("SQLITE_GRAPHRAG_GLINER_VARIANT").unwrap_or_else(|_| "fp32".to_string())
798}
799
800fn now_epoch_ms() -> u64 {
801    SystemTime::now()
802        .duration_since(UNIX_EPOCH)
803        .unwrap_or_else(|_| Duration::from_secs(0))
804        .as_millis() as u64
805}
806
807fn to_local_socket_name(name: &str) -> std::io::Result<interprocess::local_socket::Name<'static>> {
808    if let Ok(ns_name) = name.to_string().to_ns_name::<GenericNamespaced>() {
809        return Ok(ns_name);
810    }
811
812    // Fallback when abstract namespaces are unavailable. Honours XDG_RUNTIME_DIR
813    // (Linux user-private runtime dir) or SQLITE_GRAPHRAG_HOME (project override)
814    // before falling back to /tmp, which can collide when the same name is used
815    // by another user/project on a multi-tenant host. Added in v1.0.35.
816    let path = if cfg!(unix) {
817        let base = std::env::var_os("XDG_RUNTIME_DIR")
818            .or_else(|| std::env::var_os("SQLITE_GRAPHRAG_HOME"))
819            .map(std::path::PathBuf::from)
820            .unwrap_or_else(std::env::temp_dir);
821        base.join(format!("{name}.sock"))
822            .to_string_lossy()
823            .into_owned()
824    } else {
825        format!(r"\\.\pipe\{name}")
826    };
827    path.to_fs_name::<GenericFilePath>()
828}
829
830#[cfg(test)]
831mod tests {
832    use super::*;
833
834    #[test]
835    fn record_and_clear_spawn_backoff_state() {
836        let tmp = tempfile::tempdir().unwrap();
837        let models_dir = tmp.path().join("cache").join("models");
838        std::fs::create_dir_all(&models_dir).unwrap();
839
840        assert!(!spawn_backoff_active(&models_dir).unwrap());
841
842        record_spawn_failure(&models_dir, "spawn failed").unwrap();
843        assert!(spawn_backoff_active(&models_dir).unwrap());
844
845        let state = load_spawn_state(&models_dir).unwrap();
846        assert_eq!(state.consecutive_failures, 1);
847        assert_eq!(state.last_error.as_deref(), Some("spawn failed"));
848
849        clear_spawn_backoff_state(&models_dir).unwrap();
850        assert!(!spawn_backoff_active(&models_dir).unwrap());
851    }
852
853    #[test]
854    fn daemon_control_dir_uses_models_parent() {
855        let base = PathBuf::from("/tmp/sqlite-graphrag-cache-test");
856        let models_dir = base.join("models");
857        assert_eq!(daemon_control_dir(&models_dir), base);
858    }
859
860    #[test]
861    fn version_state_constants_are_distinct() {
862        assert_ne!(VERSION_NOT_CHECKED, VERSION_COMPATIBLE);
863        assert_ne!(VERSION_NOT_CHECKED, VERSION_RESTART_ATTEMPTED);
864        assert_ne!(VERSION_COMPATIBLE, VERSION_RESTART_ATTEMPTED);
865    }
866
867    #[test]
868    fn wait_for_daemon_exit_immediate_when_not_running() {
869        let tmp = tempfile::tempdir().unwrap();
870        let models_dir = tmp.path().join("cache").join("models");
871        std::fs::create_dir_all(&models_dir).unwrap();
872
873        let start = Instant::now();
874        wait_for_daemon_exit(&models_dir).unwrap();
875        // Without a daemon, the first ping returns None and the function exits immediately.
876        assert!(start.elapsed() < Duration::from_millis(500));
877    }
878
879    #[test]
880    fn spawn_backoff_exponent_caps_at_six() {
881        let tmp = tempfile::tempdir().unwrap();
882        let models_dir = tmp.path().join("cache").join("models");
883        std::fs::create_dir_all(&models_dir).unwrap();
884
885        // Record 10 consecutive failures to force exponent saturation.
886        for i in 0..10 {
887            record_spawn_failure(&models_dir, &format!("failure {i}")).unwrap();
888        }
889
890        let state = load_spawn_state(&models_dir).unwrap();
891        assert_eq!(state.consecutive_failures, 10);
892
893        // Exponent is clamped at 6, so max base_ms is base * 2^6.
894        // Effective backoff range is [base/2, base), where base <= base_ms * 64.
895        let max_base =
896            (DAEMON_SPAWN_BACKOFF_BASE_MS * (1_u64 << 6)).min(DAEMON_AUTO_START_MAX_BACKOFF_MS);
897        // The not_before_epoch_ms must not exceed now + max_base (upper bound with jitter < half).
898        let now = now_epoch_ms();
899        assert!(state.not_before_epoch_ms <= now + max_base);
900    }
901
902    #[test]
903    fn spawn_backoff_half_jitter_in_range() {
904        // Verify the half-jitter formula: result = half + fastrand::u64(0..half)
905        // produces values in [half, half + half) == [base/2, base).
906        let base_ms: u64 = 100;
907        let half = base_ms / 2;
908        for _ in 0..100 {
909            let jitter = fastrand::u64(0..half);
910            let result = half + jitter;
911            assert!(result >= half, "result {result} below half {half}");
912            assert!(result < base_ms, "result {result} not below base {base_ms}");
913        }
914    }
915
916    #[test]
917    fn to_local_socket_name_produces_valid_result() {
918        let result = to_local_socket_name("sqlite-graphrag-test-daemon");
919        assert!(result.is_ok(), "expected Ok, got {result:?}");
920        // The name string representation must be non-empty.
921        let name = result.unwrap();
922        let display = format!("{name:?}");
923        assert!(!display.is_empty());
924    }
925
926    #[test]
927    fn version_cas_not_checked_to_compatible() {
928        let state = AtomicU8::new(VERSION_NOT_CHECKED);
929        let result = state.compare_exchange(
930            VERSION_NOT_CHECKED,
931            VERSION_COMPATIBLE,
932            Ordering::SeqCst,
933            Ordering::SeqCst,
934        );
935        assert!(result.is_ok());
936        assert_eq!(state.load(Ordering::SeqCst), VERSION_COMPATIBLE);
937    }
938
939    #[test]
940    fn version_cas_prevents_double_restart() {
941        let state = AtomicU8::new(VERSION_NOT_CHECKED);
942
943        // First CAS: NOT_CHECKED → RESTART_ATTEMPTED succeeds.
944        let first = state.compare_exchange(
945            VERSION_NOT_CHECKED,
946            VERSION_RESTART_ATTEMPTED,
947            Ordering::SeqCst,
948            Ordering::SeqCst,
949        );
950        assert!(first.is_ok());
951
952        // Second CAS from NOT_CHECKED must fail — state is already RESTART_ATTEMPTED.
953        let second = state.compare_exchange(
954            VERSION_NOT_CHECKED,
955            VERSION_RESTART_ATTEMPTED,
956            Ordering::SeqCst,
957            Ordering::SeqCst,
958        );
959        assert!(second.is_err());
960        assert_eq!(state.load(Ordering::SeqCst), VERSION_RESTART_ATTEMPTED);
961    }
962
963    #[test]
964    fn ping_response_includes_model_fields() {
965        let resp = DaemonResponse::Ok {
966            pid: 42,
967            version: "1.0.0".to_string(),
968            handled_embed_requests: 7,
969            model_name: "multilingual-e5-small".to_string(),
970            model_variant: "fp32".to_string(),
971        };
972        let json = serde_json::to_value(&resp).expect("serialization failed");
973        assert_eq!(json["model_name"], "multilingual-e5-small");
974        assert_eq!(json["model_variant"], "fp32");
975        assert_eq!(json["status"], "ok");
976        assert_eq!(json["handled_embed_requests"], 7u64);
977    }
978
979    #[test]
980    fn gliner_variant_defaults_to_fp32() {
981        // Ensure the default is fp32 when env var is not set.
982        std::env::remove_var("SQLITE_GRAPHRAG_GLINER_VARIANT");
983        let variant = gliner_variant_from_env();
984        assert_eq!(variant, "fp32");
985    }
986
987    #[test]
988    fn gliner_variant_reads_env_var() {
989        std::env::set_var("SQLITE_GRAPHRAG_GLINER_VARIANT", "int8");
990        let variant = gliner_variant_from_env();
991        std::env::remove_var("SQLITE_GRAPHRAG_GLINER_VARIANT");
992        assert_eq!(variant, "int8");
993    }
994
995    #[test]
996    fn spawn_state_serialization_roundtrip() {
997        let tmp = tempfile::tempdir().unwrap();
998        let models_dir = tmp.path().join("cache").join("models");
999        std::fs::create_dir_all(&models_dir).unwrap();
1000
1001        let original = DaemonSpawnState {
1002            consecutive_failures: 3,
1003            not_before_epoch_ms: 9_999_999_999,
1004            last_error: Some("test error message".to_string()),
1005        };
1006        save_spawn_state(&models_dir, &original).unwrap();
1007
1008        let loaded = load_spawn_state(&models_dir).unwrap();
1009        assert_eq!(loaded.consecutive_failures, original.consecutive_failures);
1010        assert_eq!(loaded.not_before_epoch_ms, original.not_before_epoch_ms);
1011        assert_eq!(loaded.last_error, original.last_error);
1012    }
1013}