Skip to main content

sqlite_graphrag/
daemon.rs

1use crate::constants::{
2    DAEMON_AUTO_START_INITIAL_BACKOFF_MS, DAEMON_AUTO_START_MAX_BACKOFF_MS,
3    DAEMON_AUTO_START_MAX_WAIT_MS, DAEMON_IDLE_SHUTDOWN_SECS, DAEMON_PING_TIMEOUT_MS,
4    DAEMON_SPAWN_BACKOFF_BASE_MS, DAEMON_SPAWN_LOCK_WAIT_MS, SQLITE_GRAPHRAG_VERSION,
5};
6use crate::errors::AppError;
7use crate::{embedder, shutdown_requested};
8use fs4::fs_std::FileExt;
9use interprocess::local_socket::{
10    prelude::LocalSocketStream,
11    traits::{Listener as _, Stream as _},
12    GenericFilePath, GenericNamespaced, ListenerNonblockingMode, ListenerOptions, ToFsName,
13    ToNsName,
14};
15use serde::{Deserialize, Serialize};
16use std::fs::{File, OpenOptions};
17use std::io::{BufRead, BufReader, Write};
18use std::path::{Path, PathBuf};
19use std::process::Stdio;
20use std::sync::atomic::{AtomicU64, Ordering};
21use std::sync::Arc;
22use std::thread;
23use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
24
25#[derive(Debug, Serialize, Deserialize)]
26#[serde(tag = "request", rename_all = "snake_case")]
27pub enum DaemonRequest {
28    Ping,
29    Shutdown,
30    EmbedPassage {
31        text: String,
32    },
33    EmbedQuery {
34        text: String,
35    },
36    EmbedPassages {
37        texts: Vec<String>,
38        token_counts: Vec<usize>,
39    },
40}
41
42#[derive(Debug, Serialize, Deserialize)]
43#[serde(tag = "status", rename_all = "snake_case")]
44pub enum DaemonResponse {
45    Listening {
46        pid: u32,
47        socket: String,
48        idle_shutdown_secs: u64,
49    },
50    Ok {
51        pid: u32,
52        version: String,
53        handled_embed_requests: u64,
54    },
55    PassageEmbedding {
56        embedding: Vec<f32>,
57        handled_embed_requests: u64,
58    },
59    QueryEmbedding {
60        embedding: Vec<f32>,
61        handled_embed_requests: u64,
62    },
63    PassageEmbeddings {
64        embeddings: Vec<Vec<f32>>,
65        handled_embed_requests: u64,
66    },
67    ShuttingDown {
68        handled_embed_requests: u64,
69    },
70    Error {
71        message: String,
72    },
73}
74
75#[derive(Debug, Default, Serialize, Deserialize)]
76struct DaemonSpawnState {
77    consecutive_failures: u32,
78    not_before_epoch_ms: u64,
79    last_error: Option<String>,
80}
81
82pub fn daemon_label(models_dir: &Path) -> String {
83    let hash = blake3::hash(models_dir.to_string_lossy().as_bytes())
84        .to_hex()
85        .to_string();
86    format!("sqlite-graphrag-daemon-{}", &hash[..16])
87}
88
89pub fn try_ping(models_dir: &Path) -> Result<Option<DaemonResponse>, AppError> {
90    request_if_available(models_dir, &DaemonRequest::Ping)
91}
92
93pub fn try_shutdown(models_dir: &Path) -> Result<Option<DaemonResponse>, AppError> {
94    request_if_available(models_dir, &DaemonRequest::Shutdown)
95}
96
97pub fn embed_passage_or_local(models_dir: &Path, text: &str) -> Result<Vec<f32>, AppError> {
98    match request_or_autostart(
99        models_dir,
100        &DaemonRequest::EmbedPassage {
101            text: text.to_string(),
102        },
103    )? {
104        Some(DaemonResponse::PassageEmbedding { embedding, .. }) => Ok(embedding),
105        Some(DaemonResponse::Error { message }) => Err(AppError::Embedding(message)),
106        Some(other) => Err(AppError::Internal(anyhow::anyhow!(
107            "resposta inesperada do daemon para embedding de passage: {other:?}"
108        ))),
109        None => {
110            let embedder = embedder::get_embedder(models_dir)?;
111            embedder::embed_passage(embedder, text)
112        }
113    }
114}
115
116pub fn embed_query_or_local(models_dir: &Path, text: &str) -> Result<Vec<f32>, AppError> {
117    match request_or_autostart(
118        models_dir,
119        &DaemonRequest::EmbedQuery {
120            text: text.to_string(),
121        },
122    )? {
123        Some(DaemonResponse::QueryEmbedding { embedding, .. }) => Ok(embedding),
124        Some(DaemonResponse::Error { message }) => Err(AppError::Embedding(message)),
125        Some(other) => Err(AppError::Internal(anyhow::anyhow!(
126            "resposta inesperada do daemon para embedding de query: {other:?}"
127        ))),
128        None => {
129            let embedder = embedder::get_embedder(models_dir)?;
130            embedder::embed_query(embedder, text)
131        }
132    }
133}
134
135pub fn embed_passages_controlled_or_local(
136    models_dir: &Path,
137    texts: &[&str],
138    token_counts: &[usize],
139) -> Result<Vec<Vec<f32>>, AppError> {
140    let request = DaemonRequest::EmbedPassages {
141        texts: texts.iter().map(|t| (*t).to_string()).collect(),
142        token_counts: token_counts.to_vec(),
143    };
144
145    match request_or_autostart(models_dir, &request)? {
146        Some(DaemonResponse::PassageEmbeddings { embeddings, .. }) => Ok(embeddings),
147        Some(DaemonResponse::Error { message }) => Err(AppError::Embedding(message)),
148        Some(other) => Err(AppError::Internal(anyhow::anyhow!(
149            "resposta inesperada do daemon para batch de embeddings de passage: {other:?}"
150        ))),
151        None => {
152            let embedder = embedder::get_embedder(models_dir)?;
153            embedder::embed_passages_controlled(embedder, texts, token_counts)
154        }
155    }
156}
157
158struct DaemonSpawnGuard {
159    models_dir: PathBuf,
160}
161
162impl DaemonSpawnGuard {
163    fn new(models_dir: &Path) -> Self {
164        Self {
165            models_dir: models_dir.to_path_buf(),
166        }
167    }
168}
169
170impl Drop for DaemonSpawnGuard {
171    fn drop(&mut self) {
172        let lock_path = spawn_lock_path(&self.models_dir);
173        if lock_path.exists() {
174            match std::fs::remove_file(&lock_path) {
175                Ok(()) => {
176                    tracing::debug!(
177                        path = %lock_path.display(),
178                        "lock file de spawn removido ao encerrar daemon graciosamente"
179                    );
180                }
181                Err(err) if err.kind() == std::io::ErrorKind::NotFound => {}
182                Err(err) => {
183                    tracing::warn!(
184                        error = %err,
185                        path = %lock_path.display(),
186                        "falha ao remover lock file de spawn ao encerrar daemon"
187                    );
188                }
189            }
190        }
191        tracing::info!(
192            "daemon encerrado graciosamente; socket será limpo pelo OS ou pelo próximo daemon via try_overwrite"
193        );
194    }
195}
196
197pub fn run(models_dir: &Path, idle_shutdown_secs: u64) -> Result<(), AppError> {
198    // Tokio runtime com 2 worker threads para reduzir threads ociosas do daemon.
199    // O loop de accept permanece síncrono; cada conexão é despachada para spawn_blocking
200    // de forma que embeddings pesados não bloqueiem os workers tokio.
201    let rt = tokio::runtime::Builder::new_multi_thread()
202        .worker_threads(2)
203        .thread_name("daemon-worker")
204        .enable_all()
205        .build()
206        .map_err(AppError::Io)?;
207
208    rt.block_on(run_async(models_dir, idle_shutdown_secs))
209}
210
211async fn run_async(models_dir: &Path, idle_shutdown_secs: u64) -> Result<(), AppError> {
212    let socket = daemon_label(models_dir);
213    let name = to_local_socket_name(&socket)?;
214    let listener = ListenerOptions::new()
215        .name(name)
216        .nonblocking(ListenerNonblockingMode::Accept)
217        .try_overwrite(true)
218        .create_sync()
219        .map_err(AppError::Io)?;
220
221    // Guard que limpa o lock file de spawn em encerramento gracioso.
222    // SIGKILL não dispara Drop; nesse caso try_overwrite(true) acima é o fallback.
223    let _spawn_guard = DaemonSpawnGuard::new(models_dir);
224
225    // Warm the model once per daemon process inside spawn_blocking so the
226    // ONNX session initialisation (CPU-bound, may take several seconds) does
227    // not block a tokio worker thread.
228    let models_dir_warm = models_dir.to_path_buf();
229    tokio::task::spawn_blocking(move || embedder::get_embedder(&models_dir_warm).map(|_| ()))
230        .await
231        .map_err(|e| AppError::Internal(anyhow::anyhow!("model warm-up panicked: {e}")))??;
232
233    crate::output::emit_json(&DaemonResponse::Listening {
234        pid: std::process::id(),
235        socket,
236        idle_shutdown_secs,
237    })?;
238
239    let handled_embed_requests = Arc::new(AtomicU64::new(0));
240    let mut last_activity = Instant::now();
241    let models_dir = models_dir.to_path_buf();
242
243    loop {
244        if shutdown_requested() {
245            break;
246        }
247
248        if !daemon_control_dir(&models_dir).exists() {
249            tracing::info!("daemon control directory disappeared; shutting down");
250            break;
251        }
252
253        match listener.accept() {
254            Ok(stream) => {
255                last_activity = Instant::now();
256                let models_dir_clone = models_dir.clone();
257                let counter = Arc::clone(&handled_embed_requests);
258                let should_exit = tokio::task::spawn_blocking(move || {
259                    handle_client(stream, &models_dir_clone, &counter)
260                })
261                .await
262                .map_err(|e| {
263                    AppError::Internal(anyhow::anyhow!("spawn_blocking panicked: {e}"))
264                })??;
265
266                if should_exit {
267                    break;
268                }
269            }
270            Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
271                if last_activity.elapsed() >= Duration::from_secs(idle_shutdown_secs) {
272                    tracing::info!(
273                        idle_shutdown_secs,
274                        handled_embed_requests = handled_embed_requests.load(Ordering::Relaxed),
275                        "daemon idle timeout reached"
276                    );
277                    break;
278                }
279                tokio::time::sleep(Duration::from_millis(50)).await;
280            }
281            Err(err) => return Err(AppError::Io(err)),
282        }
283    }
284
285    Ok(())
286}
287
288fn handle_client(
289    stream: LocalSocketStream,
290    models_dir: &Path,
291    handled_embed_requests: &AtomicU64,
292) -> Result<bool, AppError> {
293    let mut reader = BufReader::new(stream);
294    let mut line = String::new();
295    reader.read_line(&mut line).map_err(AppError::Io)?;
296
297    if line.trim().is_empty() {
298        write_response(
299            reader.get_mut(),
300            &DaemonResponse::Error {
301                message: "requisição vazia ao daemon".to_string(),
302            },
303        )?;
304        return Ok(false);
305    }
306
307    let request: DaemonRequest = serde_json::from_str(line.trim()).map_err(AppError::Json)?;
308    let (response, should_exit) = match request {
309        DaemonRequest::Ping => (
310            DaemonResponse::Ok {
311                pid: std::process::id(),
312                version: SQLITE_GRAPHRAG_VERSION.to_string(),
313                handled_embed_requests: handled_embed_requests.load(Ordering::Relaxed),
314            },
315            false,
316        ),
317        DaemonRequest::Shutdown => (
318            DaemonResponse::ShuttingDown {
319                handled_embed_requests: handled_embed_requests.load(Ordering::Relaxed),
320            },
321            true,
322        ),
323        DaemonRequest::EmbedPassage { text } => {
324            let embedder = embedder::get_embedder(models_dir)?;
325            let embedding = embedder::embed_passage(embedder, &text)?;
326            let count = handled_embed_requests.fetch_add(1, Ordering::Relaxed) + 1;
327            (
328                DaemonResponse::PassageEmbedding {
329                    embedding,
330                    handled_embed_requests: count,
331                },
332                false,
333            )
334        }
335        DaemonRequest::EmbedQuery { text } => {
336            let embedder = embedder::get_embedder(models_dir)?;
337            let embedding = embedder::embed_query(embedder, &text)?;
338            let count = handled_embed_requests.fetch_add(1, Ordering::Relaxed) + 1;
339            (
340                DaemonResponse::QueryEmbedding {
341                    embedding,
342                    handled_embed_requests: count,
343                },
344                false,
345            )
346        }
347        DaemonRequest::EmbedPassages {
348            texts,
349            token_counts,
350        } => {
351            let embedder = embedder::get_embedder(models_dir)?;
352            let text_refs: Vec<&str> = texts.iter().map(String::as_str).collect();
353            let embeddings =
354                embedder::embed_passages_controlled(embedder, &text_refs, &token_counts)?;
355            let count = handled_embed_requests.fetch_add(1, Ordering::Relaxed) + 1;
356            (
357                DaemonResponse::PassageEmbeddings {
358                    embeddings,
359                    handled_embed_requests: count,
360                },
361                false,
362            )
363        }
364    };
365
366    write_response(reader.get_mut(), &response)?;
367    Ok(should_exit)
368}
369
370fn write_response(
371    stream: &mut LocalSocketStream,
372    response: &DaemonResponse,
373) -> Result<(), AppError> {
374    serde_json::to_writer(&mut *stream, response).map_err(AppError::Json)?;
375    stream.write_all(b"\n").map_err(AppError::Io)?;
376    stream.flush().map_err(AppError::Io)?;
377    Ok(())
378}
379
380fn request_if_available(
381    models_dir: &Path,
382    request: &DaemonRequest,
383) -> Result<Option<DaemonResponse>, AppError> {
384    let socket = daemon_label(models_dir);
385    let name = match to_local_socket_name(&socket) {
386        Ok(name) => name,
387        Err(err) => return Err(AppError::Io(err)),
388    };
389
390    let mut stream = match LocalSocketStream::connect(name) {
391        Ok(stream) => stream,
392        Err(err)
393            if matches!(
394                err.kind(),
395                std::io::ErrorKind::NotFound
396                    | std::io::ErrorKind::ConnectionRefused
397                    | std::io::ErrorKind::AddrNotAvailable
398                    | std::io::ErrorKind::TimedOut
399            ) =>
400        {
401            return Ok(None);
402        }
403        Err(err) => return Err(AppError::Io(err)),
404    };
405
406    serde_json::to_writer(&mut stream, request).map_err(AppError::Json)?;
407    stream.write_all(b"\n").map_err(AppError::Io)?;
408    stream.flush().map_err(AppError::Io)?;
409
410    let mut reader = BufReader::new(stream);
411    let mut line = String::new();
412    reader.read_line(&mut line).map_err(AppError::Io)?;
413    if line.trim().is_empty() {
414        return Err(AppError::Embedding("daemon retornou resposta vazia".into()));
415    }
416
417    let response = serde_json::from_str(line.trim()).map_err(AppError::Json)?;
418    Ok(Some(response))
419}
420
421fn request_or_autostart(
422    models_dir: &Path,
423    request: &DaemonRequest,
424) -> Result<Option<DaemonResponse>, AppError> {
425    if let Some(response) = request_if_available(models_dir, request)? {
426        clear_spawn_backoff_state(models_dir).ok();
427        return Ok(Some(response));
428    }
429
430    if autostart_disabled() {
431        return Ok(None);
432    }
433
434    if !ensure_daemon_running(models_dir)? {
435        return Ok(None);
436    }
437
438    request_if_available(models_dir, request)
439}
440
441fn ensure_daemon_running(models_dir: &Path) -> Result<bool, AppError> {
442    if (try_ping(models_dir)?).is_some() {
443        clear_spawn_backoff_state(models_dir).ok();
444        return Ok(true);
445    }
446
447    if spawn_backoff_active(models_dir)? {
448        tracing::warn!("daemon autostart suppressed by backoff window");
449        return Ok(false);
450    }
451
452    let spawn_lock = match try_acquire_spawn_lock(models_dir)? {
453        Some(lock) => lock,
454        None => return wait_for_daemon_ready(models_dir),
455    };
456
457    if (try_ping(models_dir)?).is_some() {
458        clear_spawn_backoff_state(models_dir).ok();
459        drop(spawn_lock);
460        return Ok(true);
461    }
462
463    let exe = match std::env::current_exe() {
464        Ok(path) => path,
465        Err(err) => {
466            record_spawn_failure(models_dir, format!("current_exe failed: {err}"))?;
467            drop(spawn_lock);
468            return Ok(false);
469        }
470    };
471
472    let mut child = std::process::Command::new(exe);
473    child
474        .arg("daemon")
475        .arg("--idle-shutdown-secs")
476        .arg(DAEMON_IDLE_SHUTDOWN_SECS.to_string())
477        .env("SQLITE_GRAPHRAG_DAEMON_CHILD", "1")
478        .stdin(Stdio::null())
479        .stdout(Stdio::null())
480        .stderr(Stdio::null());
481
482    match child.spawn() {
483        Ok(_) => {
484            let ready = wait_for_daemon_ready(models_dir)?;
485            if ready {
486                clear_spawn_backoff_state(models_dir).ok();
487            } else {
488                record_spawn_failure(
489                    models_dir,
490                    "daemon did not become healthy after autostart".to_string(),
491                )?;
492            }
493            drop(spawn_lock);
494            Ok(ready)
495        }
496        Err(err) => {
497            record_spawn_failure(models_dir, format!("daemon spawn failed: {err}"))?;
498            drop(spawn_lock);
499            Ok(false)
500        }
501    }
502}
503
504fn wait_for_daemon_ready(models_dir: &Path) -> Result<bool, AppError> {
505    let deadline = Instant::now() + Duration::from_millis(DAEMON_AUTO_START_MAX_WAIT_MS);
506    let mut sleep_ms = DAEMON_AUTO_START_INITIAL_BACKOFF_MS.max(DAEMON_PING_TIMEOUT_MS);
507
508    while Instant::now() < deadline {
509        if (try_ping(models_dir)?).is_some() {
510            return Ok(true);
511        }
512        thread::sleep(Duration::from_millis(sleep_ms));
513        sleep_ms = (sleep_ms * 2).min(DAEMON_AUTO_START_MAX_BACKOFF_MS);
514    }
515
516    Ok(false)
517}
518
519fn autostart_disabled() -> bool {
520    std::env::var("SQLITE_GRAPHRAG_DAEMON_CHILD").as_deref() == Ok("1")
521        || std::env::var("SQLITE_GRAPHRAG_DAEMON_FORCE_AUTOSTART").as_deref() != Ok("1")
522            && std::env::var("SQLITE_GRAPHRAG_DAEMON_DISABLE_AUTOSTART").as_deref() == Ok("1")
523}
524
525fn daemon_control_dir(models_dir: &Path) -> PathBuf {
526    models_dir
527        .parent()
528        .map(Path::to_path_buf)
529        .unwrap_or_else(|| models_dir.to_path_buf())
530}
531
532fn spawn_lock_path(models_dir: &Path) -> PathBuf {
533    daemon_control_dir(models_dir).join("daemon-spawn.lock")
534}
535
536fn spawn_state_path(models_dir: &Path) -> PathBuf {
537    daemon_control_dir(models_dir).join("daemon-spawn-state.json")
538}
539
540fn try_acquire_spawn_lock(models_dir: &Path) -> Result<Option<File>, AppError> {
541    let path = spawn_lock_path(models_dir);
542    std::fs::create_dir_all(crate::paths::parent_or_err(&path)?).map_err(AppError::Io)?;
543    let file = OpenOptions::new()
544        .read(true)
545        .write(true)
546        .create(true)
547        .truncate(false)
548        .open(path)
549        .map_err(AppError::Io)?;
550
551    let deadline = Instant::now() + Duration::from_millis(DAEMON_SPAWN_LOCK_WAIT_MS);
552    loop {
553        match file.try_lock_exclusive() {
554            Ok(()) => return Ok(Some(file)),
555            Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
556                if Instant::now() >= deadline {
557                    return Ok(None);
558                }
559                thread::sleep(Duration::from_millis(50));
560            }
561            Err(err) => return Err(AppError::Io(err)),
562        }
563    }
564}
565
566fn spawn_backoff_active(models_dir: &Path) -> Result<bool, AppError> {
567    let state = load_spawn_state(models_dir)?;
568    Ok(now_epoch_ms() < state.not_before_epoch_ms)
569}
570
571fn record_spawn_failure(models_dir: &Path, message: String) -> Result<(), AppError> {
572    let mut state = load_spawn_state(models_dir)?;
573    state.consecutive_failures = state.consecutive_failures.saturating_add(1);
574    let exponent = state.consecutive_failures.saturating_sub(1).min(6);
575    let backoff_ms =
576        (DAEMON_SPAWN_BACKOFF_BASE_MS * (1_u64 << exponent)).min(DAEMON_AUTO_START_MAX_BACKOFF_MS);
577    state.not_before_epoch_ms = now_epoch_ms() + backoff_ms;
578    state.last_error = Some(message);
579    save_spawn_state(models_dir, &state)
580}
581
582fn clear_spawn_backoff_state(models_dir: &Path) -> Result<(), AppError> {
583    let path = spawn_state_path(models_dir);
584    if path.exists() {
585        std::fs::remove_file(path).map_err(AppError::Io)?;
586    }
587    Ok(())
588}
589
590fn load_spawn_state(models_dir: &Path) -> Result<DaemonSpawnState, AppError> {
591    let path = spawn_state_path(models_dir);
592    if !path.exists() {
593        return Ok(DaemonSpawnState::default());
594    }
595
596    let bytes = std::fs::read(path).map_err(AppError::Io)?;
597    serde_json::from_slice(&bytes).map_err(AppError::Json)
598}
599
600fn save_spawn_state(models_dir: &Path, state: &DaemonSpawnState) -> Result<(), AppError> {
601    let path = spawn_state_path(models_dir);
602    std::fs::create_dir_all(crate::paths::parent_or_err(&path)?).map_err(AppError::Io)?;
603    let bytes = serde_json::to_vec(state).map_err(AppError::Json)?;
604    std::fs::write(path, bytes).map_err(AppError::Io)
605}
606
607fn now_epoch_ms() -> u64 {
608    SystemTime::now()
609        .duration_since(UNIX_EPOCH)
610        .unwrap_or_else(|_| Duration::from_secs(0))
611        .as_millis() as u64
612}
613
614fn to_local_socket_name(name: &str) -> std::io::Result<interprocess::local_socket::Name<'static>> {
615    if let Ok(ns_name) = name.to_string().to_ns_name::<GenericNamespaced>() {
616        return Ok(ns_name);
617    }
618
619    let path = if cfg!(unix) {
620        format!("/tmp/{name}.sock")
621    } else {
622        format!(r"\\.\pipe\{name}")
623    };
624    path.to_fs_name::<GenericFilePath>()
625}
626
627#[cfg(test)]
628mod tests {
629    use super::*;
630
631    #[test]
632    fn record_and_clear_spawn_backoff_state() {
633        let tmp = tempfile::tempdir().unwrap();
634        let models_dir = tmp.path().join("cache").join("models");
635        std::fs::create_dir_all(&models_dir).unwrap();
636
637        assert!(!spawn_backoff_active(&models_dir).unwrap());
638
639        record_spawn_failure(&models_dir, "spawn failed".to_string()).unwrap();
640        assert!(spawn_backoff_active(&models_dir).unwrap());
641
642        let state = load_spawn_state(&models_dir).unwrap();
643        assert_eq!(state.consecutive_failures, 1);
644        assert_eq!(state.last_error.as_deref(), Some("spawn failed"));
645
646        clear_spawn_backoff_state(&models_dir).unwrap();
647        assert!(!spawn_backoff_active(&models_dir).unwrap());
648    }
649
650    #[test]
651    fn daemon_control_dir_usa_pai_de_models() {
652        let base = PathBuf::from("/tmp/sqlite-graphrag-cache-test");
653        let models_dir = base.join("models");
654        assert_eq!(daemon_control_dir(&models_dir), base);
655    }
656}