Skip to main content

sqlite_graphrag/
daemon.rs

1//! IPC daemon: keeps the embedding model warm across CLI invocations.
2//!
3//! Manages the background process lifecycle, Unix-socket IPC protocol, and
4//! auto-start/backoff logic so embeddings are served without cold-start cost.
5
6use crate::constants::{
7    DAEMON_AUTO_START_INITIAL_BACKOFF_MS, DAEMON_AUTO_START_MAX_BACKOFF_MS,
8    DAEMON_AUTO_START_MAX_WAIT_MS, DAEMON_IDLE_SHUTDOWN_SECS, DAEMON_PING_TIMEOUT_MS,
9    DAEMON_SPAWN_BACKOFF_BASE_MS, DAEMON_SPAWN_LOCK_WAIT_MS, DAEMON_VERSION_RESTART_WAIT_MS,
10    SQLITE_GRAPHRAG_VERSION,
11};
12use crate::errors::AppError;
13use crate::{embedder, shutdown_requested};
14use fs4::fs_std::FileExt;
15use interprocess::local_socket::{
16    prelude::LocalSocketStream,
17    traits::{Listener as _, Stream as _},
18    GenericFilePath, GenericNamespaced, ListenerNonblockingMode, ListenerOptions, ToFsName,
19    ToNsName,
20};
21use serde::{Deserialize, Serialize};
22use std::fs::{File, OpenOptions};
23use std::io::{BufRead, BufReader, Write};
24use std::path::{Path, PathBuf};
25use std::process::Stdio;
26use std::sync::atomic::{AtomicU64, AtomicU8, Ordering};
27use std::sync::Arc;
28use std::thread;
29use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
30
31const VERSION_NOT_CHECKED: u8 = 0;
32const VERSION_COMPATIBLE: u8 = 1;
33const VERSION_RESTART_ATTEMPTED: u8 = 2;
34
35/// Guards against restart loops: tracks version check state per process lifetime.
36static DAEMON_VERSION_STATE: AtomicU8 = AtomicU8::new(VERSION_NOT_CHECKED);
37
38#[derive(Debug, Serialize, Deserialize)]
39#[serde(tag = "request", rename_all = "snake_case")]
40pub enum DaemonRequest {
41    Ping,
42    Shutdown,
43    EmbedPassage {
44        text: String,
45    },
46    EmbedQuery {
47        text: String,
48    },
49    EmbedPassages {
50        texts: Vec<String>,
51        token_counts: Vec<usize>,
52    },
53}
54
55#[derive(Debug, Serialize, Deserialize)]
56#[serde(tag = "status", rename_all = "snake_case")]
57pub enum DaemonResponse {
58    Listening {
59        pid: u32,
60        socket: String,
61        idle_shutdown_secs: u64,
62    },
63    Ok {
64        pid: u32,
65        version: String,
66        handled_embed_requests: u64,
67    },
68    PassageEmbedding {
69        embedding: Vec<f32>,
70        handled_embed_requests: u64,
71    },
72    QueryEmbedding {
73        embedding: Vec<f32>,
74        handled_embed_requests: u64,
75    },
76    PassageEmbeddings {
77        embeddings: Vec<Vec<f32>>,
78        handled_embed_requests: u64,
79    },
80    ShuttingDown {
81        handled_embed_requests: u64,
82    },
83    Error {
84        message: String,
85    },
86}
87
88#[derive(Debug, Default, Serialize, Deserialize)]
89struct DaemonSpawnState {
90    consecutive_failures: u32,
91    not_before_epoch_ms: u64,
92    last_error: Option<String>,
93}
94
95pub fn daemon_label(models_dir: &Path) -> String {
96    let hash = blake3::hash(models_dir.to_string_lossy().as_bytes())
97        .to_hex()
98        .to_string();
99    format!("sqlite-graphrag-daemon-{}", &hash[..16])
100}
101
102pub fn try_ping(models_dir: &Path) -> Result<Option<DaemonResponse>, AppError> {
103    request_if_available(models_dir, &DaemonRequest::Ping)
104}
105
106pub fn try_shutdown(models_dir: &Path) -> Result<Option<DaemonResponse>, AppError> {
107    request_if_available(models_dir, &DaemonRequest::Shutdown)
108}
109
110pub fn embed_passage_or_local(models_dir: &Path, text: &str) -> Result<Vec<f32>, AppError> {
111    match request_or_autostart(
112        models_dir,
113        &DaemonRequest::EmbedPassage {
114            text: text.to_string(),
115        },
116        true,
117    )? {
118        Some(DaemonResponse::PassageEmbedding { embedding, .. }) => Ok(embedding),
119        Some(DaemonResponse::Error { message }) => Err(AppError::Embedding(message)),
120        Some(other) => Err(AppError::Internal(anyhow::anyhow!(
121            "unexpected daemon response for passage embedding: {other:?}"
122        ))),
123        None => {
124            let embedder = embedder::get_embedder(models_dir)?;
125            embedder::embed_passage(embedder, text)
126        }
127    }
128}
129
130pub fn embed_query_or_local(
131    models_dir: &Path,
132    text: &str,
133    cli_autostart: bool,
134) -> Result<Vec<f32>, AppError> {
135    match request_or_autostart(
136        models_dir,
137        &DaemonRequest::EmbedQuery {
138            text: text.to_string(),
139        },
140        cli_autostart,
141    )? {
142        Some(DaemonResponse::QueryEmbedding { embedding, .. }) => Ok(embedding),
143        Some(DaemonResponse::Error { message }) => Err(AppError::Embedding(message)),
144        Some(other) => Err(AppError::Internal(anyhow::anyhow!(
145            "unexpected daemon response for query embedding: {other:?}"
146        ))),
147        None => {
148            let embedder = embedder::get_embedder(models_dir)?;
149            embedder::embed_query(embedder, text)
150        }
151    }
152}
153
154pub fn embed_passages_controlled_or_local(
155    models_dir: &Path,
156    texts: &[&str],
157    token_counts: &[usize],
158) -> Result<Vec<Vec<f32>>, AppError> {
159    let request = DaemonRequest::EmbedPassages {
160        texts: texts.iter().map(|t| (*t).to_string()).collect(),
161        token_counts: token_counts.to_vec(),
162    };
163
164    match request_or_autostart(models_dir, &request, true)? {
165        Some(DaemonResponse::PassageEmbeddings { embeddings, .. }) => Ok(embeddings),
166        Some(DaemonResponse::Error { message }) => Err(AppError::Embedding(message)),
167        Some(other) => Err(AppError::Internal(anyhow::anyhow!(
168            "unexpected daemon response for passage embedding batch: {other:?}"
169        ))),
170        None => {
171            let embedder = embedder::get_embedder(models_dir)?;
172            embedder::embed_passages_controlled(embedder, texts, token_counts)
173        }
174    }
175}
176
177struct DaemonSpawnGuard {
178    models_dir: PathBuf,
179}
180
181impl DaemonSpawnGuard {
182    fn new(models_dir: &Path) -> Self {
183        Self {
184            models_dir: models_dir.to_path_buf(),
185        }
186    }
187}
188
189impl Drop for DaemonSpawnGuard {
190    fn drop(&mut self) {
191        let lock_path = spawn_lock_path(&self.models_dir);
192        if lock_path.exists() {
193            match std::fs::remove_file(&lock_path) {
194                Ok(()) => {
195                    tracing::debug!(
196                        path = %lock_path.display(),
197                        "spawn lock file removed during graceful daemon shutdown"
198                    );
199                }
200                Err(err) if err.kind() == std::io::ErrorKind::NotFound => {}
201                Err(err) => {
202                    tracing::warn!(
203                        error = %err,
204                        path = %lock_path.display(),
205                        "failed to remove spawn lock file while shutting down daemon"
206                    );
207                }
208            }
209        }
210        tracing::info!(
211            "daemon shut down gracefully; socket will be cleaned up by OS or by the next daemon via try_overwrite"
212        );
213    }
214}
215
216pub fn run(models_dir: &Path, idle_shutdown_secs: u64) -> Result<(), AppError> {
217    // Scale worker threads to available parallelism so embedding tasks saturate CPU cores.
218    // Clamped to [2, 8] to avoid excessive threads on high-core machines.
219    let permits = std::thread::available_parallelism()
220        .map(|n| n.get())
221        .unwrap_or(2)
222        .clamp(2, 8);
223    let rt = tokio::runtime::Builder::new_multi_thread()
224        .worker_threads(permits)
225        .thread_name("daemon-worker")
226        .enable_all()
227        .build()
228        .map_err(AppError::Io)?;
229
230    rt.block_on(run_async(models_dir, idle_shutdown_secs, permits))
231}
232
233async fn run_async(
234    models_dir: &Path,
235    idle_shutdown_secs: u64,
236    permits: usize,
237) -> Result<(), AppError> {
238    let socket = daemon_label(models_dir);
239    let name = to_local_socket_name(&socket)?;
240    let listener = ListenerOptions::new()
241        .name(name)
242        .nonblocking(ListenerNonblockingMode::Accept)
243        .try_overwrite(true)
244        .create_sync()
245        .map_err(AppError::Io)?;
246
247    // Guard that cleans up the spawn lock file on graceful shutdown.
248    // SIGKILL does not trigger Drop; in that case try_overwrite(true) above is the fallback.
249    let _spawn_guard = DaemonSpawnGuard::new(models_dir);
250
251    // Warm the model once per daemon process inside spawn_blocking so the
252    // ONNX session initialisation (CPU-bound, may take several seconds) does
253    // not block a tokio worker thread.
254    let models_dir_warm = models_dir.to_path_buf();
255    tokio::task::spawn_blocking(move || embedder::get_embedder(&models_dir_warm).map(|_| ()))
256        .await
257        .map_err(|e| AppError::Internal(anyhow::anyhow!("model warm-up panicked: {e}")))??;
258
259    crate::output::emit_json(&DaemonResponse::Listening {
260        pid: std::process::id(),
261        socket,
262        idle_shutdown_secs,
263    })?;
264
265    let handled_embed_requests = Arc::new(AtomicU64::new(0));
266    let mut last_activity = Instant::now();
267    let models_dir = models_dir.to_path_buf();
268    // Bound concurrent spawn_blocking tasks to the same thread count as the runtime.
269    let permit_pool = Arc::new(tokio::sync::Semaphore::new(permits));
270
271    loop {
272        if shutdown_requested() {
273            break;
274        }
275
276        if !daemon_control_dir(&models_dir).exists() {
277            tracing::info!("daemon control directory disappeared; shutting down");
278            break;
279        }
280
281        match listener.accept() {
282            Ok(stream) => {
283                last_activity = Instant::now();
284                let models_dir_clone = models_dir.clone();
285                let counter = Arc::clone(&handled_embed_requests);
286                let permit =
287                    permit_pool.clone().acquire_owned().await.map_err(|e| {
288                        AppError::Internal(anyhow::anyhow!("semaphore closed: {e}"))
289                    })?;
290                let should_exit = tokio::task::spawn_blocking(move || {
291                    let _permit = permit; // hold until end of scope
292                    handle_client(stream, &models_dir_clone, &counter)
293                })
294                .await
295                .map_err(|e| {
296                    AppError::Internal(anyhow::anyhow!("spawn_blocking panicked: {e}"))
297                })??;
298
299                if should_exit {
300                    break;
301                }
302            }
303            Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
304                if last_activity.elapsed() >= Duration::from_secs(idle_shutdown_secs) {
305                    tracing::info!(
306                        idle_shutdown_secs,
307                        handled_embed_requests = handled_embed_requests.load(Ordering::Relaxed),
308                        "daemon idle timeout reached"
309                    );
310                    break;
311                }
312                tokio::time::sleep(Duration::from_millis(50)).await;
313            }
314            Err(err) => return Err(AppError::Io(err)),
315        }
316    }
317
318    Ok(())
319}
320
321fn handle_client(
322    stream: LocalSocketStream,
323    models_dir: &Path,
324    handled_embed_requests: &AtomicU64,
325) -> Result<bool, AppError> {
326    let mut reader = BufReader::new(stream);
327    let mut line = String::new();
328    reader.read_line(&mut line).map_err(AppError::Io)?;
329
330    if line.trim().is_empty() {
331        write_response(
332            reader.get_mut(),
333            &DaemonResponse::Error {
334                message: "empty request to daemon".to_string(),
335            },
336        )?;
337        return Ok(false);
338    }
339
340    let request: DaemonRequest = serde_json::from_str(line.trim()).map_err(AppError::Json)?;
341    let (response, should_exit) = match request {
342        DaemonRequest::Ping => (
343            DaemonResponse::Ok {
344                pid: std::process::id(),
345                version: SQLITE_GRAPHRAG_VERSION.to_string(),
346                handled_embed_requests: handled_embed_requests.load(Ordering::Relaxed),
347            },
348            false,
349        ),
350        DaemonRequest::Shutdown => (
351            DaemonResponse::ShuttingDown {
352                handled_embed_requests: handled_embed_requests.load(Ordering::Relaxed),
353            },
354            true,
355        ),
356        DaemonRequest::EmbedPassage { text } => {
357            let embedder = embedder::get_embedder(models_dir)?;
358            let embedding = embedder::embed_passage(embedder, &text)?;
359            let count = handled_embed_requests.fetch_add(1, Ordering::Relaxed) + 1;
360            (
361                DaemonResponse::PassageEmbedding {
362                    embedding,
363                    handled_embed_requests: count,
364                },
365                false,
366            )
367        }
368        DaemonRequest::EmbedQuery { text } => {
369            let embedder = embedder::get_embedder(models_dir)?;
370            let embedding = embedder::embed_query(embedder, &text)?;
371            let count = handled_embed_requests.fetch_add(1, Ordering::Relaxed) + 1;
372            (
373                DaemonResponse::QueryEmbedding {
374                    embedding,
375                    handled_embed_requests: count,
376                },
377                false,
378            )
379        }
380        DaemonRequest::EmbedPassages {
381            texts,
382            token_counts,
383        } => {
384            let embedder = embedder::get_embedder(models_dir)?;
385            let text_refs: Vec<&str> = texts.iter().map(String::as_str).collect();
386            let embeddings =
387                embedder::embed_passages_controlled(embedder, &text_refs, &token_counts)?;
388            let count = handled_embed_requests.fetch_add(1, Ordering::Relaxed) + 1;
389            (
390                DaemonResponse::PassageEmbeddings {
391                    embeddings,
392                    handled_embed_requests: count,
393                },
394                false,
395            )
396        }
397    };
398
399    write_response(reader.get_mut(), &response)?;
400    Ok(should_exit)
401}
402
403fn write_response(
404    stream: &mut LocalSocketStream,
405    response: &DaemonResponse,
406) -> Result<(), AppError> {
407    serde_json::to_writer(&mut *stream, response).map_err(AppError::Json)?;
408    stream.write_all(b"\n").map_err(AppError::Io)?;
409    stream.flush().map_err(AppError::Io)?;
410    Ok(())
411}
412
413fn request_if_available(
414    models_dir: &Path,
415    request: &DaemonRequest,
416) -> Result<Option<DaemonResponse>, AppError> {
417    let socket = daemon_label(models_dir);
418    let name = match to_local_socket_name(&socket) {
419        Ok(name) => name,
420        Err(err) => return Err(AppError::Io(err)),
421    };
422
423    let mut stream = match LocalSocketStream::connect(name) {
424        Ok(stream) => stream,
425        Err(err)
426            if matches!(
427                err.kind(),
428                std::io::ErrorKind::NotFound
429                    | std::io::ErrorKind::ConnectionRefused
430                    | std::io::ErrorKind::AddrNotAvailable
431                    | std::io::ErrorKind::TimedOut
432            ) =>
433        {
434            return Ok(None);
435        }
436        Err(err) => return Err(AppError::Io(err)),
437    };
438
439    serde_json::to_writer(&mut stream, request).map_err(AppError::Json)?;
440    stream.write_all(b"\n").map_err(AppError::Io)?;
441    stream.flush().map_err(AppError::Io)?;
442
443    let mut reader = BufReader::new(stream);
444    let mut line = String::new();
445    reader.read_line(&mut line).map_err(AppError::Io)?;
446    if line.trim().is_empty() {
447        return Err(AppError::Embedding(
448            "daemon returned an empty response".into(),
449        ));
450    }
451
452    let response = serde_json::from_str(line.trim()).map_err(AppError::Json)?;
453    Ok(Some(response))
454}
455
456fn should_autostart(cli_flag: bool) -> bool {
457    if !cli_flag {
458        return false; // explicit CLI override wins
459    }
460    !autostart_disabled_by_env()
461}
462
463/// Checks whether a running daemon has a different version from the current CLI binary.
464/// If a mismatch is detected, shuts down the stale daemon, waits for it to exit, and
465/// re-spawns a fresh one. The `VERSION_RESTART_ATTEMPTED` state prevents infinite loops:
466/// this function is a no-op after the first attempt regardless of outcome.
467fn maybe_restart_for_version_mismatch(models_dir: &Path) -> Result<(), AppError> {
468    if DAEMON_VERSION_STATE
469        .compare_exchange(
470            VERSION_NOT_CHECKED,
471            VERSION_COMPATIBLE,
472            Ordering::SeqCst,
473            Ordering::SeqCst,
474        )
475        .is_err()
476    {
477        // Already checked (compatible) or already attempted a restart — skip.
478        return Ok(());
479    }
480
481    let response = match try_ping(models_dir)? {
482        Some(r) => r,
483        None => return Ok(()), // no daemon running, nothing to check
484    };
485
486    let daemon_version = match &response {
487        DaemonResponse::Ok { version, .. } => version.as_str(),
488        _ => return Ok(()), // unexpected response shape, skip
489    };
490
491    if daemon_version == SQLITE_GRAPHRAG_VERSION {
492        return Ok(()); // versions match, state already set to COMPATIBLE
493    }
494
495    // Mismatch detected — mark as restart-attempted so we never loop.
496    DAEMON_VERSION_STATE.store(VERSION_RESTART_ATTEMPTED, Ordering::SeqCst);
497
498    tracing::warn!(
499        daemon_version = %daemon_version,
500        cli_version = SQLITE_GRAPHRAG_VERSION,
501        "daemon version mismatch detected; auto-restarting daemon"
502    );
503
504    // Send shutdown request.
505    try_shutdown(models_dir)?;
506
507    // Wait for the stale daemon to exit.
508    wait_for_daemon_exit(models_dir)?;
509
510    // Re-spawn the daemon via the existing mechanism.
511    ensure_daemon_running(models_dir)?;
512
513    Ok(())
514}
515
516/// Polls until the daemon stops responding to pings, with exponential backoff.
517/// Starts at 50 ms, doubles each iteration, caps at 500 ms per sleep.
518/// Returns `Ok(())` once the daemon is gone or the timeout is reached.
519fn wait_for_daemon_exit(models_dir: &Path) -> Result<(), AppError> {
520    let deadline = Instant::now() + Duration::from_millis(DAEMON_VERSION_RESTART_WAIT_MS);
521    let mut sleep_ms: u64 = 50;
522
523    while Instant::now() < deadline {
524        if try_ping(models_dir)?.is_none() {
525            tracing::debug!("stale daemon exited after version-mismatch shutdown");
526            return Ok(());
527        }
528        thread::sleep(Duration::from_millis(sleep_ms));
529        sleep_ms = (sleep_ms * 2).min(500);
530    }
531
532    tracing::warn!(
533        timeout_ms = DAEMON_VERSION_RESTART_WAIT_MS,
534        "timed out waiting for stale daemon to exit after version-mismatch shutdown"
535    );
536    Ok(())
537}
538
539fn request_or_autostart(
540    models_dir: &Path,
541    request: &DaemonRequest,
542    cli_autostart: bool,
543) -> Result<Option<DaemonResponse>, AppError> {
544    if DAEMON_VERSION_STATE.load(Ordering::SeqCst) == VERSION_NOT_CHECKED {
545        maybe_restart_for_version_mismatch(models_dir)?;
546    }
547
548    if let Some(response) = request_if_available(models_dir, request)? {
549        clear_spawn_backoff_state(models_dir).ok();
550        return Ok(Some(response));
551    }
552
553    if !should_autostart(cli_autostart) {
554        return Ok(None);
555    }
556
557    if !ensure_daemon_running(models_dir)? {
558        return Ok(None);
559    }
560
561    request_if_available(models_dir, request)
562}
563
564fn ensure_daemon_running(models_dir: &Path) -> Result<bool, AppError> {
565    if (try_ping(models_dir)?).is_some() {
566        clear_spawn_backoff_state(models_dir).ok();
567        return Ok(true);
568    }
569
570    if spawn_backoff_active(models_dir)? {
571        tracing::warn!("daemon autostart suppressed by backoff window");
572        return Ok(false);
573    }
574
575    let spawn_lock = match try_acquire_spawn_lock(models_dir)? {
576        Some(lock) => lock,
577        None => return wait_for_daemon_ready(models_dir),
578    };
579
580    if (try_ping(models_dir)?).is_some() {
581        clear_spawn_backoff_state(models_dir).ok();
582        drop(spawn_lock);
583        return Ok(true);
584    }
585
586    let exe = match std::env::current_exe() {
587        Ok(path) => path,
588        Err(err) => {
589            record_spawn_failure(models_dir, format!("current_exe failed: {err}"))?;
590            drop(spawn_lock);
591            return Ok(false);
592        }
593    };
594
595    let mut child = std::process::Command::new(exe);
596    child
597        .arg("daemon")
598        .arg("--idle-shutdown-secs")
599        .arg(DAEMON_IDLE_SHUTDOWN_SECS.to_string())
600        .env("SQLITE_GRAPHRAG_DAEMON_CHILD", "1")
601        .env_remove("LD_PRELOAD")
602        .env_remove("LD_LIBRARY_PATH")
603        .env_remove("LD_AUDIT")
604        .env_remove("DYLD_INSERT_LIBRARIES")
605        .env_remove("DYLD_LIBRARY_PATH")
606        .stdin(Stdio::null())
607        .stdout(Stdio::null())
608        .stderr(Stdio::null());
609
610    match child.spawn() {
611        Ok(child_handle) => {
612            // SAFETY: deliberate orphan daemon detach. The Child handle is intentionally
613            // dropped without a corresponding `.wait()` call because the daemon owns its
614            // own lifecycle: `Stdio::null()` is set on stdin/stdout/stderr (above) so the
615            // child does not inherit terminal handles, the spawn lock file at
616            // `<models_dir>/.daemon.spawn.lock` prevents concurrent spawns, and the
617            // daemon shuts itself down via `DAEMON_IDLE_SHUTDOWN_SECS` (or an explicit
618            // `daemon stop`/SIGTERM). Keeping the handle here would block the parent
619            // CLI in the foreground until the daemon exited, defeating the autostart
620            // contract that callers expect.
621            // See: docs_rules/rules_rust_processos_externos.md section "Child detach justificado"
622            //      AND docs/adr/0001-daemon-warmup-exception.md (authorized exception to no-daemon rule)
623            let pid = child_handle.id();
624            drop(child_handle);
625            tracing::debug!(
626                pid,
627                "daemon detached; lifecycle managed via spawn lock + readiness file"
628            );
629            let ready = wait_for_daemon_ready(models_dir)?;
630            if ready {
631                clear_spawn_backoff_state(models_dir).ok();
632            } else {
633                record_spawn_failure(
634                    models_dir,
635                    "daemon did not become healthy after autostart".to_string(),
636                )?;
637            }
638            drop(spawn_lock);
639            Ok(ready)
640        }
641        Err(err) => {
642            record_spawn_failure(models_dir, format!("daemon spawn failed: {err}"))?;
643            drop(spawn_lock);
644            Ok(false)
645        }
646    }
647}
648
649fn wait_for_daemon_ready(models_dir: &Path) -> Result<bool, AppError> {
650    let deadline = Instant::now() + Duration::from_millis(DAEMON_AUTO_START_MAX_WAIT_MS);
651    let mut sleep_ms = DAEMON_AUTO_START_INITIAL_BACKOFF_MS.max(DAEMON_PING_TIMEOUT_MS);
652
653    while Instant::now() < deadline {
654        if (try_ping(models_dir)?).is_some() {
655            return Ok(true);
656        }
657        thread::sleep(Duration::from_millis(sleep_ms));
658        sleep_ms = (sleep_ms * 2).min(DAEMON_AUTO_START_MAX_BACKOFF_MS);
659    }
660
661    Ok(false)
662}
663
664fn autostart_disabled_by_env() -> bool {
665    std::env::var("SQLITE_GRAPHRAG_DAEMON_CHILD").as_deref() == Ok("1")
666        || std::env::var("SQLITE_GRAPHRAG_DAEMON_FORCE_AUTOSTART").as_deref() != Ok("1")
667            && std::env::var("SQLITE_GRAPHRAG_DAEMON_DISABLE_AUTOSTART").as_deref() == Ok("1")
668}
669
670fn daemon_control_dir(models_dir: &Path) -> PathBuf {
671    models_dir
672        .parent()
673        .map(Path::to_path_buf)
674        .unwrap_or_else(|| models_dir.to_path_buf())
675}
676
677fn spawn_lock_path(models_dir: &Path) -> PathBuf {
678    daemon_control_dir(models_dir).join("daemon-spawn.lock")
679}
680
681fn spawn_state_path(models_dir: &Path) -> PathBuf {
682    daemon_control_dir(models_dir).join("daemon-spawn-state.json")
683}
684
685fn try_acquire_spawn_lock(models_dir: &Path) -> Result<Option<File>, AppError> {
686    let path = spawn_lock_path(models_dir);
687    std::fs::create_dir_all(crate::paths::parent_or_err(&path)?).map_err(AppError::Io)?;
688    let file = OpenOptions::new()
689        .read(true)
690        .write(true)
691        .create(true)
692        .truncate(false)
693        .open(path)
694        .map_err(AppError::Io)?;
695
696    let deadline = Instant::now() + Duration::from_millis(DAEMON_SPAWN_LOCK_WAIT_MS);
697    loop {
698        match file.try_lock_exclusive() {
699            Ok(()) => return Ok(Some(file)),
700            Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
701                if Instant::now() >= deadline {
702                    return Ok(None);
703                }
704                thread::sleep(Duration::from_millis(50));
705            }
706            Err(err) => return Err(AppError::Io(err)),
707        }
708    }
709}
710
711fn spawn_backoff_active(models_dir: &Path) -> Result<bool, AppError> {
712    let state = load_spawn_state(models_dir)?;
713    Ok(now_epoch_ms() < state.not_before_epoch_ms)
714}
715
716fn record_spawn_failure(models_dir: &Path, message: String) -> Result<(), AppError> {
717    let mut state = load_spawn_state(models_dir)?;
718    state.consecutive_failures = state.consecutive_failures.saturating_add(1);
719    let exponent = state.consecutive_failures.saturating_sub(1).min(6);
720    let base_ms =
721        (DAEMON_SPAWN_BACKOFF_BASE_MS * (1_u64 << exponent)).min(DAEMON_AUTO_START_MAX_BACKOFF_MS);
722    // v1.0.36 (L2) + v1.0.43 (H7): half-jitter via fastrand (replaces SystemTime nanoseconds
723    // which violated rules_rust_retry_com_backoff.md). Effective backoff range: [base/2, base).
724    let half = base_ms / 2;
725    let jitter = if half == 0 { 0 } else { fastrand::u64(0..half) };
726    let backoff_ms = half + jitter;
727    state.not_before_epoch_ms = now_epoch_ms() + backoff_ms;
728    state.last_error = Some(message);
729    save_spawn_state(models_dir, &state)
730}
731
732fn clear_spawn_backoff_state(models_dir: &Path) -> Result<(), AppError> {
733    let path = spawn_state_path(models_dir);
734    if path.exists() {
735        std::fs::remove_file(path).map_err(AppError::Io)?;
736    }
737    Ok(())
738}
739
740fn load_spawn_state(models_dir: &Path) -> Result<DaemonSpawnState, AppError> {
741    let path = spawn_state_path(models_dir);
742    if !path.exists() {
743        return Ok(DaemonSpawnState::default());
744    }
745
746    let bytes = std::fs::read(path).map_err(AppError::Io)?;
747    serde_json::from_slice(&bytes).map_err(AppError::Json)
748}
749
750fn save_spawn_state(models_dir: &Path, state: &DaemonSpawnState) -> Result<(), AppError> {
751    let path = spawn_state_path(models_dir);
752    std::fs::create_dir_all(crate::paths::parent_or_err(&path)?).map_err(AppError::Io)?;
753    let bytes = serde_json::to_vec(state).map_err(AppError::Json)?;
754    std::fs::write(path, bytes).map_err(AppError::Io)
755}
756
757fn now_epoch_ms() -> u64 {
758    SystemTime::now()
759        .duration_since(UNIX_EPOCH)
760        .unwrap_or_else(|_| Duration::from_secs(0))
761        .as_millis() as u64
762}
763
764fn to_local_socket_name(name: &str) -> std::io::Result<interprocess::local_socket::Name<'static>> {
765    if let Ok(ns_name) = name.to_string().to_ns_name::<GenericNamespaced>() {
766        return Ok(ns_name);
767    }
768
769    // Fallback when abstract namespaces are unavailable. Honours XDG_RUNTIME_DIR
770    // (Linux user-private runtime dir) or SQLITE_GRAPHRAG_HOME (project override)
771    // before falling back to /tmp, which can collide when the same name is used
772    // by another user/project on a multi-tenant host. Added in v1.0.35.
773    let path = if cfg!(unix) {
774        let base = std::env::var_os("XDG_RUNTIME_DIR")
775            .or_else(|| std::env::var_os("SQLITE_GRAPHRAG_HOME"))
776            .map(std::path::PathBuf::from)
777            .unwrap_or_else(std::env::temp_dir);
778        base.join(format!("{name}.sock"))
779            .to_string_lossy()
780            .into_owned()
781    } else {
782        format!(r"\\.\pipe\{name}")
783    };
784    path.to_fs_name::<GenericFilePath>()
785}
786
787#[cfg(test)]
788mod tests {
789    use super::*;
790
791    #[test]
792    fn record_and_clear_spawn_backoff_state() {
793        let tmp = tempfile::tempdir().unwrap();
794        let models_dir = tmp.path().join("cache").join("models");
795        std::fs::create_dir_all(&models_dir).unwrap();
796
797        assert!(!spawn_backoff_active(&models_dir).unwrap());
798
799        record_spawn_failure(&models_dir, "spawn failed".to_string()).unwrap();
800        assert!(spawn_backoff_active(&models_dir).unwrap());
801
802        let state = load_spawn_state(&models_dir).unwrap();
803        assert_eq!(state.consecutive_failures, 1);
804        assert_eq!(state.last_error.as_deref(), Some("spawn failed"));
805
806        clear_spawn_backoff_state(&models_dir).unwrap();
807        assert!(!spawn_backoff_active(&models_dir).unwrap());
808    }
809
810    #[test]
811    fn daemon_control_dir_uses_models_parent() {
812        let base = PathBuf::from("/tmp/sqlite-graphrag-cache-test");
813        let models_dir = base.join("models");
814        assert_eq!(daemon_control_dir(&models_dir), base);
815    }
816
817    #[test]
818    fn version_state_constants_are_distinct() {
819        assert_ne!(VERSION_NOT_CHECKED, VERSION_COMPATIBLE);
820        assert_ne!(VERSION_NOT_CHECKED, VERSION_RESTART_ATTEMPTED);
821        assert_ne!(VERSION_COMPATIBLE, VERSION_RESTART_ATTEMPTED);
822    }
823
824    #[test]
825    fn wait_for_daemon_exit_immediate_when_not_running() {
826        let tmp = tempfile::tempdir().unwrap();
827        let models_dir = tmp.path().join("cache").join("models");
828        std::fs::create_dir_all(&models_dir).unwrap();
829
830        let start = Instant::now();
831        wait_for_daemon_exit(&models_dir).unwrap();
832        // Without a daemon, the first ping returns None and the function exits immediately.
833        assert!(start.elapsed() < Duration::from_millis(500));
834    }
835}