Skip to main content

sqlite_graphrag/
daemon.rs

1//! IPC daemon: keeps the embedding model warm across CLI invocations.
2//!
3//! Manages the background process lifecycle, Unix-socket IPC protocol, and
4//! auto-start/backoff logic so embeddings are served without cold-start cost.
5
6use crate::constants::{
7    DAEMON_AUTO_START_INITIAL_BACKOFF_MS, DAEMON_AUTO_START_MAX_BACKOFF_MS,
8    DAEMON_AUTO_START_MAX_WAIT_MS, DAEMON_IDLE_SHUTDOWN_SECS, DAEMON_PING_TIMEOUT_MS,
9    DAEMON_SPAWN_BACKOFF_BASE_MS, DAEMON_SPAWN_LOCK_WAIT_MS, SQLITE_GRAPHRAG_VERSION,
10};
11use crate::errors::AppError;
12use crate::{embedder, shutdown_requested};
13use fs4::fs_std::FileExt;
14use interprocess::local_socket::{
15    prelude::LocalSocketStream,
16    traits::{Listener as _, Stream as _},
17    GenericFilePath, GenericNamespaced, ListenerNonblockingMode, ListenerOptions, ToFsName,
18    ToNsName,
19};
20use serde::{Deserialize, Serialize};
21use std::fs::{File, OpenOptions};
22use std::io::{BufRead, BufReader, Write};
23use std::path::{Path, PathBuf};
24use std::process::Stdio;
25use std::sync::atomic::{AtomicU64, Ordering};
26use std::sync::Arc;
27use std::thread;
28use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
29
30#[derive(Debug, Serialize, Deserialize)]
31#[serde(tag = "request", rename_all = "snake_case")]
32pub enum DaemonRequest {
33    Ping,
34    Shutdown,
35    EmbedPassage {
36        text: String,
37    },
38    EmbedQuery {
39        text: String,
40    },
41    EmbedPassages {
42        texts: Vec<String>,
43        token_counts: Vec<usize>,
44    },
45}
46
47#[derive(Debug, Serialize, Deserialize)]
48#[serde(tag = "status", rename_all = "snake_case")]
49pub enum DaemonResponse {
50    Listening {
51        pid: u32,
52        socket: String,
53        idle_shutdown_secs: u64,
54    },
55    Ok {
56        pid: u32,
57        version: String,
58        handled_embed_requests: u64,
59    },
60    PassageEmbedding {
61        embedding: Vec<f32>,
62        handled_embed_requests: u64,
63    },
64    QueryEmbedding {
65        embedding: Vec<f32>,
66        handled_embed_requests: u64,
67    },
68    PassageEmbeddings {
69        embeddings: Vec<Vec<f32>>,
70        handled_embed_requests: u64,
71    },
72    ShuttingDown {
73        handled_embed_requests: u64,
74    },
75    Error {
76        message: String,
77    },
78}
79
80#[derive(Debug, Default, Serialize, Deserialize)]
81struct DaemonSpawnState {
82    consecutive_failures: u32,
83    not_before_epoch_ms: u64,
84    last_error: Option<String>,
85}
86
87pub fn daemon_label(models_dir: &Path) -> String {
88    let hash = blake3::hash(models_dir.to_string_lossy().as_bytes())
89        .to_hex()
90        .to_string();
91    format!("sqlite-graphrag-daemon-{}", &hash[..16])
92}
93
94pub fn try_ping(models_dir: &Path) -> Result<Option<DaemonResponse>, AppError> {
95    request_if_available(models_dir, &DaemonRequest::Ping)
96}
97
98pub fn try_shutdown(models_dir: &Path) -> Result<Option<DaemonResponse>, AppError> {
99    request_if_available(models_dir, &DaemonRequest::Shutdown)
100}
101
102pub fn embed_passage_or_local(models_dir: &Path, text: &str) -> Result<Vec<f32>, AppError> {
103    match request_or_autostart(
104        models_dir,
105        &DaemonRequest::EmbedPassage {
106            text: text.to_string(),
107        },
108    )? {
109        Some(DaemonResponse::PassageEmbedding { embedding, .. }) => Ok(embedding),
110        Some(DaemonResponse::Error { message }) => Err(AppError::Embedding(message)),
111        Some(other) => Err(AppError::Internal(anyhow::anyhow!(
112            "resposta inesperada do daemon para embedding de passage: {other:?}"
113        ))),
114        None => {
115            let embedder = embedder::get_embedder(models_dir)?;
116            embedder::embed_passage(embedder, text)
117        }
118    }
119}
120
121pub fn embed_query_or_local(models_dir: &Path, text: &str) -> Result<Vec<f32>, AppError> {
122    match request_or_autostart(
123        models_dir,
124        &DaemonRequest::EmbedQuery {
125            text: text.to_string(),
126        },
127    )? {
128        Some(DaemonResponse::QueryEmbedding { embedding, .. }) => Ok(embedding),
129        Some(DaemonResponse::Error { message }) => Err(AppError::Embedding(message)),
130        Some(other) => Err(AppError::Internal(anyhow::anyhow!(
131            "resposta inesperada do daemon para embedding de query: {other:?}"
132        ))),
133        None => {
134            let embedder = embedder::get_embedder(models_dir)?;
135            embedder::embed_query(embedder, text)
136        }
137    }
138}
139
140pub fn embed_passages_controlled_or_local(
141    models_dir: &Path,
142    texts: &[&str],
143    token_counts: &[usize],
144) -> Result<Vec<Vec<f32>>, AppError> {
145    let request = DaemonRequest::EmbedPassages {
146        texts: texts.iter().map(|t| (*t).to_string()).collect(),
147        token_counts: token_counts.to_vec(),
148    };
149
150    match request_or_autostart(models_dir, &request)? {
151        Some(DaemonResponse::PassageEmbeddings { embeddings, .. }) => Ok(embeddings),
152        Some(DaemonResponse::Error { message }) => Err(AppError::Embedding(message)),
153        Some(other) => Err(AppError::Internal(anyhow::anyhow!(
154            "resposta inesperada do daemon para batch de embeddings de passage: {other:?}"
155        ))),
156        None => {
157            let embedder = embedder::get_embedder(models_dir)?;
158            embedder::embed_passages_controlled(embedder, texts, token_counts)
159        }
160    }
161}
162
163struct DaemonSpawnGuard {
164    models_dir: PathBuf,
165}
166
167impl DaemonSpawnGuard {
168    fn new(models_dir: &Path) -> Self {
169        Self {
170            models_dir: models_dir.to_path_buf(),
171        }
172    }
173}
174
175impl Drop for DaemonSpawnGuard {
176    fn drop(&mut self) {
177        let lock_path = spawn_lock_path(&self.models_dir);
178        if lock_path.exists() {
179            match std::fs::remove_file(&lock_path) {
180                Ok(()) => {
181                    tracing::debug!(
182                        path = %lock_path.display(),
183                        "lock file de spawn removido ao encerrar daemon graciosamente"
184                    );
185                }
186                Err(err) if err.kind() == std::io::ErrorKind::NotFound => {}
187                Err(err) => {
188                    tracing::warn!(
189                        error = %err,
190                        path = %lock_path.display(),
191                        "falha ao remover lock file de spawn ao encerrar daemon"
192                    );
193                }
194            }
195        }
196        tracing::info!(
197            "daemon encerrado graciosamente; socket será limpo pelo OS ou pelo próximo daemon via try_overwrite"
198        );
199    }
200}
201
202pub fn run(models_dir: &Path, idle_shutdown_secs: u64) -> Result<(), AppError> {
203    // Tokio runtime com 2 worker threads para reduzir threads ociosas do daemon.
204    // O loop de accept permanece síncrono; cada conexão é despachada para spawn_blocking
205    // de forma que embeddings pesados não bloqueiem os workers tokio.
206    let rt = tokio::runtime::Builder::new_multi_thread()
207        .worker_threads(2)
208        .thread_name("daemon-worker")
209        .enable_all()
210        .build()
211        .map_err(AppError::Io)?;
212
213    rt.block_on(run_async(models_dir, idle_shutdown_secs))
214}
215
216async fn run_async(models_dir: &Path, idle_shutdown_secs: u64) -> Result<(), AppError> {
217    let socket = daemon_label(models_dir);
218    let name = to_local_socket_name(&socket)?;
219    let listener = ListenerOptions::new()
220        .name(name)
221        .nonblocking(ListenerNonblockingMode::Accept)
222        .try_overwrite(true)
223        .create_sync()
224        .map_err(AppError::Io)?;
225
226    // Guard que limpa o lock file de spawn em encerramento gracioso.
227    // SIGKILL não dispara Drop; nesse caso try_overwrite(true) acima é o fallback.
228    let _spawn_guard = DaemonSpawnGuard::new(models_dir);
229
230    // Warm the model once per daemon process inside spawn_blocking so the
231    // ONNX session initialisation (CPU-bound, may take several seconds) does
232    // not block a tokio worker thread.
233    let models_dir_warm = models_dir.to_path_buf();
234    tokio::task::spawn_blocking(move || embedder::get_embedder(&models_dir_warm).map(|_| ()))
235        .await
236        .map_err(|e| AppError::Internal(anyhow::anyhow!("model warm-up panicked: {e}")))??;
237
238    crate::output::emit_json(&DaemonResponse::Listening {
239        pid: std::process::id(),
240        socket,
241        idle_shutdown_secs,
242    })?;
243
244    let handled_embed_requests = Arc::new(AtomicU64::new(0));
245    let mut last_activity = Instant::now();
246    let models_dir = models_dir.to_path_buf();
247
248    loop {
249        if shutdown_requested() {
250            break;
251        }
252
253        if !daemon_control_dir(&models_dir).exists() {
254            tracing::info!("daemon control directory disappeared; shutting down");
255            break;
256        }
257
258        match listener.accept() {
259            Ok(stream) => {
260                last_activity = Instant::now();
261                let models_dir_clone = models_dir.clone();
262                let counter = Arc::clone(&handled_embed_requests);
263                let should_exit = tokio::task::spawn_blocking(move || {
264                    handle_client(stream, &models_dir_clone, &counter)
265                })
266                .await
267                .map_err(|e| {
268                    AppError::Internal(anyhow::anyhow!("spawn_blocking panicked: {e}"))
269                })??;
270
271                if should_exit {
272                    break;
273                }
274            }
275            Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
276                if last_activity.elapsed() >= Duration::from_secs(idle_shutdown_secs) {
277                    tracing::info!(
278                        idle_shutdown_secs,
279                        handled_embed_requests = handled_embed_requests.load(Ordering::Relaxed),
280                        "daemon idle timeout reached"
281                    );
282                    break;
283                }
284                tokio::time::sleep(Duration::from_millis(50)).await;
285            }
286            Err(err) => return Err(AppError::Io(err)),
287        }
288    }
289
290    Ok(())
291}
292
293fn handle_client(
294    stream: LocalSocketStream,
295    models_dir: &Path,
296    handled_embed_requests: &AtomicU64,
297) -> Result<bool, AppError> {
298    let mut reader = BufReader::new(stream);
299    let mut line = String::new();
300    reader.read_line(&mut line).map_err(AppError::Io)?;
301
302    if line.trim().is_empty() {
303        write_response(
304            reader.get_mut(),
305            &DaemonResponse::Error {
306                message: "requisição vazia ao daemon".to_string(),
307            },
308        )?;
309        return Ok(false);
310    }
311
312    let request: DaemonRequest = serde_json::from_str(line.trim()).map_err(AppError::Json)?;
313    let (response, should_exit) = match request {
314        DaemonRequest::Ping => (
315            DaemonResponse::Ok {
316                pid: std::process::id(),
317                version: SQLITE_GRAPHRAG_VERSION.to_string(),
318                handled_embed_requests: handled_embed_requests.load(Ordering::Relaxed),
319            },
320            false,
321        ),
322        DaemonRequest::Shutdown => (
323            DaemonResponse::ShuttingDown {
324                handled_embed_requests: handled_embed_requests.load(Ordering::Relaxed),
325            },
326            true,
327        ),
328        DaemonRequest::EmbedPassage { text } => {
329            let embedder = embedder::get_embedder(models_dir)?;
330            let embedding = embedder::embed_passage(embedder, &text)?;
331            let count = handled_embed_requests.fetch_add(1, Ordering::Relaxed) + 1;
332            (
333                DaemonResponse::PassageEmbedding {
334                    embedding,
335                    handled_embed_requests: count,
336                },
337                false,
338            )
339        }
340        DaemonRequest::EmbedQuery { text } => {
341            let embedder = embedder::get_embedder(models_dir)?;
342            let embedding = embedder::embed_query(embedder, &text)?;
343            let count = handled_embed_requests.fetch_add(1, Ordering::Relaxed) + 1;
344            (
345                DaemonResponse::QueryEmbedding {
346                    embedding,
347                    handled_embed_requests: count,
348                },
349                false,
350            )
351        }
352        DaemonRequest::EmbedPassages {
353            texts,
354            token_counts,
355        } => {
356            let embedder = embedder::get_embedder(models_dir)?;
357            let text_refs: Vec<&str> = texts.iter().map(String::as_str).collect();
358            let embeddings =
359                embedder::embed_passages_controlled(embedder, &text_refs, &token_counts)?;
360            let count = handled_embed_requests.fetch_add(1, Ordering::Relaxed) + 1;
361            (
362                DaemonResponse::PassageEmbeddings {
363                    embeddings,
364                    handled_embed_requests: count,
365                },
366                false,
367            )
368        }
369    };
370
371    write_response(reader.get_mut(), &response)?;
372    Ok(should_exit)
373}
374
375fn write_response(
376    stream: &mut LocalSocketStream,
377    response: &DaemonResponse,
378) -> Result<(), AppError> {
379    serde_json::to_writer(&mut *stream, response).map_err(AppError::Json)?;
380    stream.write_all(b"\n").map_err(AppError::Io)?;
381    stream.flush().map_err(AppError::Io)?;
382    Ok(())
383}
384
385fn request_if_available(
386    models_dir: &Path,
387    request: &DaemonRequest,
388) -> Result<Option<DaemonResponse>, AppError> {
389    let socket = daemon_label(models_dir);
390    let name = match to_local_socket_name(&socket) {
391        Ok(name) => name,
392        Err(err) => return Err(AppError::Io(err)),
393    };
394
395    let mut stream = match LocalSocketStream::connect(name) {
396        Ok(stream) => stream,
397        Err(err)
398            if matches!(
399                err.kind(),
400                std::io::ErrorKind::NotFound
401                    | std::io::ErrorKind::ConnectionRefused
402                    | std::io::ErrorKind::AddrNotAvailable
403                    | std::io::ErrorKind::TimedOut
404            ) =>
405        {
406            return Ok(None);
407        }
408        Err(err) => return Err(AppError::Io(err)),
409    };
410
411    serde_json::to_writer(&mut stream, request).map_err(AppError::Json)?;
412    stream.write_all(b"\n").map_err(AppError::Io)?;
413    stream.flush().map_err(AppError::Io)?;
414
415    let mut reader = BufReader::new(stream);
416    let mut line = String::new();
417    reader.read_line(&mut line).map_err(AppError::Io)?;
418    if line.trim().is_empty() {
419        return Err(AppError::Embedding("daemon retornou resposta vazia".into()));
420    }
421
422    let response = serde_json::from_str(line.trim()).map_err(AppError::Json)?;
423    Ok(Some(response))
424}
425
426fn request_or_autostart(
427    models_dir: &Path,
428    request: &DaemonRequest,
429) -> Result<Option<DaemonResponse>, AppError> {
430    if let Some(response) = request_if_available(models_dir, request)? {
431        clear_spawn_backoff_state(models_dir).ok();
432        return Ok(Some(response));
433    }
434
435    if autostart_disabled() {
436        return Ok(None);
437    }
438
439    if !ensure_daemon_running(models_dir)? {
440        return Ok(None);
441    }
442
443    request_if_available(models_dir, request)
444}
445
446fn ensure_daemon_running(models_dir: &Path) -> Result<bool, AppError> {
447    if (try_ping(models_dir)?).is_some() {
448        clear_spawn_backoff_state(models_dir).ok();
449        return Ok(true);
450    }
451
452    if spawn_backoff_active(models_dir)? {
453        tracing::warn!("daemon autostart suppressed by backoff window");
454        return Ok(false);
455    }
456
457    let spawn_lock = match try_acquire_spawn_lock(models_dir)? {
458        Some(lock) => lock,
459        None => return wait_for_daemon_ready(models_dir),
460    };
461
462    if (try_ping(models_dir)?).is_some() {
463        clear_spawn_backoff_state(models_dir).ok();
464        drop(spawn_lock);
465        return Ok(true);
466    }
467
468    let exe = match std::env::current_exe() {
469        Ok(path) => path,
470        Err(err) => {
471            record_spawn_failure(models_dir, format!("current_exe failed: {err}"))?;
472            drop(spawn_lock);
473            return Ok(false);
474        }
475    };
476
477    let mut child = std::process::Command::new(exe);
478    child
479        .arg("daemon")
480        .arg("--idle-shutdown-secs")
481        .arg(DAEMON_IDLE_SHUTDOWN_SECS.to_string())
482        .env("SQLITE_GRAPHRAG_DAEMON_CHILD", "1")
483        .stdin(Stdio::null())
484        .stdout(Stdio::null())
485        .stderr(Stdio::null());
486
487    match child.spawn() {
488        Ok(_) => {
489            let ready = wait_for_daemon_ready(models_dir)?;
490            if ready {
491                clear_spawn_backoff_state(models_dir).ok();
492            } else {
493                record_spawn_failure(
494                    models_dir,
495                    "daemon did not become healthy after autostart".to_string(),
496                )?;
497            }
498            drop(spawn_lock);
499            Ok(ready)
500        }
501        Err(err) => {
502            record_spawn_failure(models_dir, format!("daemon spawn failed: {err}"))?;
503            drop(spawn_lock);
504            Ok(false)
505        }
506    }
507}
508
509fn wait_for_daemon_ready(models_dir: &Path) -> Result<bool, AppError> {
510    let deadline = Instant::now() + Duration::from_millis(DAEMON_AUTO_START_MAX_WAIT_MS);
511    let mut sleep_ms = DAEMON_AUTO_START_INITIAL_BACKOFF_MS.max(DAEMON_PING_TIMEOUT_MS);
512
513    while Instant::now() < deadline {
514        if (try_ping(models_dir)?).is_some() {
515            return Ok(true);
516        }
517        thread::sleep(Duration::from_millis(sleep_ms));
518        sleep_ms = (sleep_ms * 2).min(DAEMON_AUTO_START_MAX_BACKOFF_MS);
519    }
520
521    Ok(false)
522}
523
524fn autostart_disabled() -> bool {
525    std::env::var("SQLITE_GRAPHRAG_DAEMON_CHILD").as_deref() == Ok("1")
526        || std::env::var("SQLITE_GRAPHRAG_DAEMON_FORCE_AUTOSTART").as_deref() != Ok("1")
527            && std::env::var("SQLITE_GRAPHRAG_DAEMON_DISABLE_AUTOSTART").as_deref() == Ok("1")
528}
529
530fn daemon_control_dir(models_dir: &Path) -> PathBuf {
531    models_dir
532        .parent()
533        .map(Path::to_path_buf)
534        .unwrap_or_else(|| models_dir.to_path_buf())
535}
536
537fn spawn_lock_path(models_dir: &Path) -> PathBuf {
538    daemon_control_dir(models_dir).join("daemon-spawn.lock")
539}
540
541fn spawn_state_path(models_dir: &Path) -> PathBuf {
542    daemon_control_dir(models_dir).join("daemon-spawn-state.json")
543}
544
545fn try_acquire_spawn_lock(models_dir: &Path) -> Result<Option<File>, AppError> {
546    let path = spawn_lock_path(models_dir);
547    std::fs::create_dir_all(crate::paths::parent_or_err(&path)?).map_err(AppError::Io)?;
548    let file = OpenOptions::new()
549        .read(true)
550        .write(true)
551        .create(true)
552        .truncate(false)
553        .open(path)
554        .map_err(AppError::Io)?;
555
556    let deadline = Instant::now() + Duration::from_millis(DAEMON_SPAWN_LOCK_WAIT_MS);
557    loop {
558        match file.try_lock_exclusive() {
559            Ok(()) => return Ok(Some(file)),
560            Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
561                if Instant::now() >= deadline {
562                    return Ok(None);
563                }
564                thread::sleep(Duration::from_millis(50));
565            }
566            Err(err) => return Err(AppError::Io(err)),
567        }
568    }
569}
570
571fn spawn_backoff_active(models_dir: &Path) -> Result<bool, AppError> {
572    let state = load_spawn_state(models_dir)?;
573    Ok(now_epoch_ms() < state.not_before_epoch_ms)
574}
575
576fn record_spawn_failure(models_dir: &Path, message: String) -> Result<(), AppError> {
577    let mut state = load_spawn_state(models_dir)?;
578    state.consecutive_failures = state.consecutive_failures.saturating_add(1);
579    let exponent = state.consecutive_failures.saturating_sub(1).min(6);
580    let backoff_ms =
581        (DAEMON_SPAWN_BACKOFF_BASE_MS * (1_u64 << exponent)).min(DAEMON_AUTO_START_MAX_BACKOFF_MS);
582    state.not_before_epoch_ms = now_epoch_ms() + backoff_ms;
583    state.last_error = Some(message);
584    save_spawn_state(models_dir, &state)
585}
586
587fn clear_spawn_backoff_state(models_dir: &Path) -> Result<(), AppError> {
588    let path = spawn_state_path(models_dir);
589    if path.exists() {
590        std::fs::remove_file(path).map_err(AppError::Io)?;
591    }
592    Ok(())
593}
594
595fn load_spawn_state(models_dir: &Path) -> Result<DaemonSpawnState, AppError> {
596    let path = spawn_state_path(models_dir);
597    if !path.exists() {
598        return Ok(DaemonSpawnState::default());
599    }
600
601    let bytes = std::fs::read(path).map_err(AppError::Io)?;
602    serde_json::from_slice(&bytes).map_err(AppError::Json)
603}
604
605fn save_spawn_state(models_dir: &Path, state: &DaemonSpawnState) -> Result<(), AppError> {
606    let path = spawn_state_path(models_dir);
607    std::fs::create_dir_all(crate::paths::parent_or_err(&path)?).map_err(AppError::Io)?;
608    let bytes = serde_json::to_vec(state).map_err(AppError::Json)?;
609    std::fs::write(path, bytes).map_err(AppError::Io)
610}
611
612fn now_epoch_ms() -> u64 {
613    SystemTime::now()
614        .duration_since(UNIX_EPOCH)
615        .unwrap_or_else(|_| Duration::from_secs(0))
616        .as_millis() as u64
617}
618
619fn to_local_socket_name(name: &str) -> std::io::Result<interprocess::local_socket::Name<'static>> {
620    if let Ok(ns_name) = name.to_string().to_ns_name::<GenericNamespaced>() {
621        return Ok(ns_name);
622    }
623
624    let path = if cfg!(unix) {
625        format!("/tmp/{name}.sock")
626    } else {
627        format!(r"\\.\pipe\{name}")
628    };
629    path.to_fs_name::<GenericFilePath>()
630}
631
632#[cfg(test)]
633mod tests {
634    use super::*;
635
636    #[test]
637    fn record_and_clear_spawn_backoff_state() {
638        let tmp = tempfile::tempdir().unwrap();
639        let models_dir = tmp.path().join("cache").join("models");
640        std::fs::create_dir_all(&models_dir).unwrap();
641
642        assert!(!spawn_backoff_active(&models_dir).unwrap());
643
644        record_spawn_failure(&models_dir, "spawn failed".to_string()).unwrap();
645        assert!(spawn_backoff_active(&models_dir).unwrap());
646
647        let state = load_spawn_state(&models_dir).unwrap();
648        assert_eq!(state.consecutive_failures, 1);
649        assert_eq!(state.last_error.as_deref(), Some("spawn failed"));
650
651        clear_spawn_backoff_state(&models_dir).unwrap();
652        assert!(!spawn_backoff_active(&models_dir).unwrap());
653    }
654
655    #[test]
656    fn daemon_control_dir_usa_pai_de_models() {
657        let base = PathBuf::from("/tmp/sqlite-graphrag-cache-test");
658        let models_dir = base.join("models");
659        assert_eq!(daemon_control_dir(&models_dir), base);
660    }
661}