Skip to main content

sqlite_graphrag/
daemon.rs

1use crate::constants::{
2    DAEMON_AUTO_START_INITIAL_BACKOFF_MS, DAEMON_AUTO_START_MAX_BACKOFF_MS,
3    DAEMON_AUTO_START_MAX_WAIT_MS, DAEMON_IDLE_SHUTDOWN_SECS, DAEMON_PING_TIMEOUT_MS,
4    DAEMON_SPAWN_BACKOFF_BASE_MS, DAEMON_SPAWN_LOCK_WAIT_MS, SQLITE_GRAPHRAG_VERSION,
5};
6use crate::errors::AppError;
7use crate::{embedder, shutdown_requested};
8use fs4::fs_std::FileExt;
9use interprocess::local_socket::{
10    prelude::LocalSocketStream,
11    traits::{Listener as _, Stream as _},
12    GenericFilePath, GenericNamespaced, ListenerNonblockingMode, ListenerOptions, ToFsName,
13    ToNsName,
14};
15use serde::{Deserialize, Serialize};
16use std::fs::{File, OpenOptions};
17use std::io::{BufRead, BufReader, Write};
18use std::path::{Path, PathBuf};
19use std::process::Stdio;
20use std::thread;
21use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
22
23#[derive(Debug, Serialize, Deserialize)]
24#[serde(tag = "request", rename_all = "snake_case")]
25pub enum DaemonRequest {
26    Ping,
27    Shutdown,
28    EmbedPassage {
29        text: String,
30    },
31    EmbedQuery {
32        text: String,
33    },
34    EmbedPassages {
35        texts: Vec<String>,
36        token_counts: Vec<usize>,
37    },
38}
39
40#[derive(Debug, Serialize, Deserialize)]
41#[serde(tag = "status", rename_all = "snake_case")]
42pub enum DaemonResponse {
43    Listening {
44        pid: u32,
45        socket: String,
46        idle_shutdown_secs: u64,
47    },
48    Ok {
49        pid: u32,
50        version: String,
51        handled_embed_requests: u64,
52    },
53    PassageEmbedding {
54        embedding: Vec<f32>,
55        handled_embed_requests: u64,
56    },
57    QueryEmbedding {
58        embedding: Vec<f32>,
59        handled_embed_requests: u64,
60    },
61    PassageEmbeddings {
62        embeddings: Vec<Vec<f32>>,
63        handled_embed_requests: u64,
64    },
65    ShuttingDown {
66        handled_embed_requests: u64,
67    },
68    Error {
69        message: String,
70    },
71}
72
73#[derive(Debug, Default, Serialize, Deserialize)]
74struct DaemonSpawnState {
75    consecutive_failures: u32,
76    not_before_epoch_ms: u64,
77    last_error: Option<String>,
78}
79
80pub fn daemon_label(models_dir: &Path) -> String {
81    let hash = blake3::hash(models_dir.to_string_lossy().as_bytes())
82        .to_hex()
83        .to_string();
84    format!("sqlite-graphrag-daemon-{}", &hash[..16])
85}
86
87pub fn try_ping(models_dir: &Path) -> Result<Option<DaemonResponse>, AppError> {
88    request_if_available(models_dir, &DaemonRequest::Ping)
89}
90
91pub fn try_shutdown(models_dir: &Path) -> Result<Option<DaemonResponse>, AppError> {
92    request_if_available(models_dir, &DaemonRequest::Shutdown)
93}
94
95pub fn embed_passage_or_local(models_dir: &Path, text: &str) -> Result<Vec<f32>, AppError> {
96    match request_or_autostart(
97        models_dir,
98        &DaemonRequest::EmbedPassage {
99            text: text.to_string(),
100        },
101    )? {
102        Some(DaemonResponse::PassageEmbedding { embedding, .. }) => Ok(embedding),
103        Some(DaemonResponse::Error { message }) => Err(AppError::Embedding(message)),
104        Some(other) => Err(AppError::Internal(anyhow::anyhow!(
105            "resposta inesperada do daemon para embedding de passage: {other:?}"
106        ))),
107        None => {
108            let embedder = embedder::get_embedder(models_dir)?;
109            embedder::embed_passage(embedder, text)
110        }
111    }
112}
113
114pub fn embed_query_or_local(models_dir: &Path, text: &str) -> Result<Vec<f32>, AppError> {
115    match request_or_autostart(
116        models_dir,
117        &DaemonRequest::EmbedQuery {
118            text: text.to_string(),
119        },
120    )? {
121        Some(DaemonResponse::QueryEmbedding { embedding, .. }) => Ok(embedding),
122        Some(DaemonResponse::Error { message }) => Err(AppError::Embedding(message)),
123        Some(other) => Err(AppError::Internal(anyhow::anyhow!(
124            "resposta inesperada do daemon para embedding de query: {other:?}"
125        ))),
126        None => {
127            let embedder = embedder::get_embedder(models_dir)?;
128            embedder::embed_query(embedder, text)
129        }
130    }
131}
132
133pub fn embed_passages_controlled_or_local(
134    models_dir: &Path,
135    texts: &[&str],
136    token_counts: &[usize],
137) -> Result<Vec<Vec<f32>>, AppError> {
138    let request = DaemonRequest::EmbedPassages {
139        texts: texts.iter().map(|t| (*t).to_string()).collect(),
140        token_counts: token_counts.to_vec(),
141    };
142
143    match request_or_autostart(models_dir, &request)? {
144        Some(DaemonResponse::PassageEmbeddings { embeddings, .. }) => Ok(embeddings),
145        Some(DaemonResponse::Error { message }) => Err(AppError::Embedding(message)),
146        Some(other) => Err(AppError::Internal(anyhow::anyhow!(
147            "resposta inesperada do daemon para batch de embeddings de passage: {other:?}"
148        ))),
149        None => {
150            let embedder = embedder::get_embedder(models_dir)?;
151            embedder::embed_passages_controlled(embedder, texts, token_counts)
152        }
153    }
154}
155
156struct DaemonSpawnGuard {
157    models_dir: PathBuf,
158}
159
160impl DaemonSpawnGuard {
161    fn new(models_dir: &Path) -> Self {
162        Self {
163            models_dir: models_dir.to_path_buf(),
164        }
165    }
166}
167
168impl Drop for DaemonSpawnGuard {
169    fn drop(&mut self) {
170        let lock_path = spawn_lock_path(&self.models_dir);
171        if lock_path.exists() {
172            match std::fs::remove_file(&lock_path) {
173                Ok(()) => {
174                    tracing::debug!(
175                        path = %lock_path.display(),
176                        "lock file de spawn removido ao encerrar daemon graciosamente"
177                    );
178                }
179                Err(err) if err.kind() == std::io::ErrorKind::NotFound => {}
180                Err(err) => {
181                    tracing::warn!(
182                        error = %err,
183                        path = %lock_path.display(),
184                        "falha ao remover lock file de spawn ao encerrar daemon"
185                    );
186                }
187            }
188        }
189        tracing::info!(
190            "daemon encerrado graciosamente; socket será limpo pelo OS ou pelo próximo daemon via try_overwrite"
191        );
192    }
193}
194
195pub fn run(models_dir: &Path, idle_shutdown_secs: u64) -> Result<(), AppError> {
196    // Tokio runtime com 2 worker threads para reduzir threads ociosas do daemon.
197    // O loop de accept permanece síncrono; cada conexão é despachada para spawn_blocking
198    // de forma que embeddings pesados não bloqueiem os workers tokio.
199    let rt = tokio::runtime::Builder::new_multi_thread()
200        .worker_threads(2)
201        .thread_name("daemon-worker")
202        .enable_all()
203        .build()
204        .map_err(AppError::Io)?;
205
206    rt.block_on(run_async(models_dir, idle_shutdown_secs))
207}
208
209async fn run_async(models_dir: &Path, idle_shutdown_secs: u64) -> Result<(), AppError> {
210    let socket = daemon_label(models_dir);
211    let name = to_local_socket_name(&socket)?;
212    let listener = ListenerOptions::new()
213        .name(name)
214        .nonblocking(ListenerNonblockingMode::Accept)
215        .try_overwrite(true)
216        .create_sync()
217        .map_err(AppError::Io)?;
218
219    // Guard que limpa o lock file de spawn em encerramento gracioso.
220    // SIGKILL não dispara Drop; nesse caso try_overwrite(true) acima é o fallback.
221    let _spawn_guard = DaemonSpawnGuard::new(models_dir);
222
223    // Warm the model once per daemon process.
224    let _ = embedder::get_embedder(models_dir)?;
225
226    crate::output::emit_json(&DaemonResponse::Listening {
227        pid: std::process::id(),
228        socket,
229        idle_shutdown_secs,
230    })?;
231
232    let mut handled_embed_requests = 0_u64;
233    let mut last_activity = Instant::now();
234    let models_dir = models_dir.to_path_buf();
235
236    loop {
237        if shutdown_requested() {
238            break;
239        }
240
241        if !daemon_control_dir(&models_dir).exists() {
242            tracing::info!("daemon control directory disappeared; shutting down");
243            break;
244        }
245
246        match listener.accept() {
247            Ok(stream) => {
248                last_activity = Instant::now();
249                let models_dir_clone = models_dir.clone();
250                let result = tokio::task::spawn_blocking(move || {
251                    let mut counter = 0_u64;
252                    handle_client(stream, &models_dir_clone, &mut counter)
253                        .map(|should_exit| (should_exit, counter))
254                })
255                .await
256                .map_err(|e| AppError::Internal(anyhow::anyhow!("spawn_blocking panicked: {e}")))?;
257
258                let (should_exit, delta) = result?;
259                handled_embed_requests += delta;
260
261                if should_exit {
262                    break;
263                }
264            }
265            Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
266                if last_activity.elapsed() >= Duration::from_secs(idle_shutdown_secs) {
267                    tracing::info!(
268                        idle_shutdown_secs,
269                        handled_embed_requests,
270                        "daemon idle timeout reached"
271                    );
272                    break;
273                }
274                tokio::time::sleep(Duration::from_millis(50)).await;
275            }
276            Err(err) => return Err(AppError::Io(err)),
277        }
278    }
279
280    Ok(())
281}
282
283fn handle_client(
284    stream: LocalSocketStream,
285    models_dir: &Path,
286    handled_embed_requests: &mut u64,
287) -> Result<bool, AppError> {
288    let mut reader = BufReader::new(stream);
289    let mut line = String::new();
290    reader.read_line(&mut line).map_err(AppError::Io)?;
291
292    if line.trim().is_empty() {
293        write_response(
294            reader.get_mut(),
295            &DaemonResponse::Error {
296                message: "requisição vazia ao daemon".to_string(),
297            },
298        )?;
299        return Ok(false);
300    }
301
302    let request: DaemonRequest = serde_json::from_str(line.trim()).map_err(AppError::Json)?;
303    let (response, should_exit) = match request {
304        DaemonRequest::Ping => (
305            DaemonResponse::Ok {
306                pid: std::process::id(),
307                version: SQLITE_GRAPHRAG_VERSION.to_string(),
308                handled_embed_requests: *handled_embed_requests,
309            },
310            false,
311        ),
312        DaemonRequest::Shutdown => (
313            DaemonResponse::ShuttingDown {
314                handled_embed_requests: *handled_embed_requests,
315            },
316            true,
317        ),
318        DaemonRequest::EmbedPassage { text } => {
319            let embedder = embedder::get_embedder(models_dir)?;
320            let embedding = embedder::embed_passage(embedder, &text)?;
321            *handled_embed_requests += 1;
322            (
323                DaemonResponse::PassageEmbedding {
324                    embedding,
325                    handled_embed_requests: *handled_embed_requests,
326                },
327                false,
328            )
329        }
330        DaemonRequest::EmbedQuery { text } => {
331            let embedder = embedder::get_embedder(models_dir)?;
332            let embedding = embedder::embed_query(embedder, &text)?;
333            *handled_embed_requests += 1;
334            (
335                DaemonResponse::QueryEmbedding {
336                    embedding,
337                    handled_embed_requests: *handled_embed_requests,
338                },
339                false,
340            )
341        }
342        DaemonRequest::EmbedPassages {
343            texts,
344            token_counts,
345        } => {
346            let embedder = embedder::get_embedder(models_dir)?;
347            let text_refs: Vec<&str> = texts.iter().map(String::as_str).collect();
348            let embeddings =
349                embedder::embed_passages_controlled(embedder, &text_refs, &token_counts)?;
350            *handled_embed_requests += 1;
351            (
352                DaemonResponse::PassageEmbeddings {
353                    embeddings,
354                    handled_embed_requests: *handled_embed_requests,
355                },
356                false,
357            )
358        }
359    };
360
361    write_response(reader.get_mut(), &response)?;
362    Ok(should_exit)
363}
364
365fn write_response(
366    stream: &mut LocalSocketStream,
367    response: &DaemonResponse,
368) -> Result<(), AppError> {
369    serde_json::to_writer(&mut *stream, response).map_err(AppError::Json)?;
370    stream.write_all(b"\n").map_err(AppError::Io)?;
371    stream.flush().map_err(AppError::Io)?;
372    Ok(())
373}
374
375fn request_if_available(
376    models_dir: &Path,
377    request: &DaemonRequest,
378) -> Result<Option<DaemonResponse>, AppError> {
379    let socket = daemon_label(models_dir);
380    let name = match to_local_socket_name(&socket) {
381        Ok(name) => name,
382        Err(err) => return Err(AppError::Io(err)),
383    };
384
385    let mut stream = match LocalSocketStream::connect(name) {
386        Ok(stream) => stream,
387        Err(err)
388            if matches!(
389                err.kind(),
390                std::io::ErrorKind::NotFound
391                    | std::io::ErrorKind::ConnectionRefused
392                    | std::io::ErrorKind::AddrNotAvailable
393                    | std::io::ErrorKind::TimedOut
394            ) =>
395        {
396            return Ok(None);
397        }
398        Err(err) => return Err(AppError::Io(err)),
399    };
400
401    serde_json::to_writer(&mut stream, request).map_err(AppError::Json)?;
402    stream.write_all(b"\n").map_err(AppError::Io)?;
403    stream.flush().map_err(AppError::Io)?;
404
405    let mut reader = BufReader::new(stream);
406    let mut line = String::new();
407    reader.read_line(&mut line).map_err(AppError::Io)?;
408    if line.trim().is_empty() {
409        return Err(AppError::Embedding("daemon retornou resposta vazia".into()));
410    }
411
412    let response = serde_json::from_str(line.trim()).map_err(AppError::Json)?;
413    Ok(Some(response))
414}
415
416fn request_or_autostart(
417    models_dir: &Path,
418    request: &DaemonRequest,
419) -> Result<Option<DaemonResponse>, AppError> {
420    if let Some(response) = request_if_available(models_dir, request)? {
421        clear_spawn_backoff_state(models_dir).ok();
422        return Ok(Some(response));
423    }
424
425    if autostart_disabled() {
426        return Ok(None);
427    }
428
429    if !ensure_daemon_running(models_dir)? {
430        return Ok(None);
431    }
432
433    request_if_available(models_dir, request)
434}
435
436fn ensure_daemon_running(models_dir: &Path) -> Result<bool, AppError> {
437    if (try_ping(models_dir)?).is_some() {
438        clear_spawn_backoff_state(models_dir).ok();
439        return Ok(true);
440    }
441
442    if spawn_backoff_active(models_dir)? {
443        tracing::warn!("daemon autostart suppressed by backoff window");
444        return Ok(false);
445    }
446
447    let spawn_lock = match try_acquire_spawn_lock(models_dir)? {
448        Some(lock) => lock,
449        None => return wait_for_daemon_ready(models_dir),
450    };
451
452    if (try_ping(models_dir)?).is_some() {
453        clear_spawn_backoff_state(models_dir).ok();
454        drop(spawn_lock);
455        return Ok(true);
456    }
457
458    let exe = match std::env::current_exe() {
459        Ok(path) => path,
460        Err(err) => {
461            record_spawn_failure(models_dir, format!("current_exe failed: {err}"))?;
462            drop(spawn_lock);
463            return Ok(false);
464        }
465    };
466
467    let mut child = std::process::Command::new(exe);
468    child
469        .arg("daemon")
470        .arg("--idle-shutdown-secs")
471        .arg(DAEMON_IDLE_SHUTDOWN_SECS.to_string())
472        .env("SQLITE_GRAPHRAG_DAEMON_CHILD", "1")
473        .stdin(Stdio::null())
474        .stdout(Stdio::null())
475        .stderr(Stdio::null());
476
477    match child.spawn() {
478        Ok(_) => {
479            let ready = wait_for_daemon_ready(models_dir)?;
480            if ready {
481                clear_spawn_backoff_state(models_dir).ok();
482            } else {
483                record_spawn_failure(
484                    models_dir,
485                    "daemon did not become healthy after autostart".to_string(),
486                )?;
487            }
488            drop(spawn_lock);
489            Ok(ready)
490        }
491        Err(err) => {
492            record_spawn_failure(models_dir, format!("daemon spawn failed: {err}"))?;
493            drop(spawn_lock);
494            Ok(false)
495        }
496    }
497}
498
499fn wait_for_daemon_ready(models_dir: &Path) -> Result<bool, AppError> {
500    let deadline = Instant::now() + Duration::from_millis(DAEMON_AUTO_START_MAX_WAIT_MS);
501    let mut sleep_ms = DAEMON_AUTO_START_INITIAL_BACKOFF_MS.max(DAEMON_PING_TIMEOUT_MS);
502
503    while Instant::now() < deadline {
504        if (try_ping(models_dir)?).is_some() {
505            return Ok(true);
506        }
507        thread::sleep(Duration::from_millis(sleep_ms));
508        sleep_ms = (sleep_ms * 2).min(DAEMON_AUTO_START_MAX_BACKOFF_MS);
509    }
510
511    Ok(false)
512}
513
514fn autostart_disabled() -> bool {
515    std::env::var("SQLITE_GRAPHRAG_DAEMON_CHILD").as_deref() == Ok("1")
516        || std::env::var("SQLITE_GRAPHRAG_DAEMON_FORCE_AUTOSTART").as_deref() != Ok("1")
517            && std::env::var("SQLITE_GRAPHRAG_DAEMON_DISABLE_AUTOSTART").as_deref() == Ok("1")
518}
519
520fn daemon_control_dir(models_dir: &Path) -> PathBuf {
521    models_dir
522        .parent()
523        .map(Path::to_path_buf)
524        .unwrap_or_else(|| models_dir.to_path_buf())
525}
526
527fn spawn_lock_path(models_dir: &Path) -> PathBuf {
528    daemon_control_dir(models_dir).join("daemon-spawn.lock")
529}
530
531fn spawn_state_path(models_dir: &Path) -> PathBuf {
532    daemon_control_dir(models_dir).join("daemon-spawn-state.json")
533}
534
535fn try_acquire_spawn_lock(models_dir: &Path) -> Result<Option<File>, AppError> {
536    let path = spawn_lock_path(models_dir);
537    std::fs::create_dir_all(crate::paths::parent_or_err(&path)?).map_err(AppError::Io)?;
538    let file = OpenOptions::new()
539        .read(true)
540        .write(true)
541        .create(true)
542        .truncate(false)
543        .open(path)
544        .map_err(AppError::Io)?;
545
546    let deadline = Instant::now() + Duration::from_millis(DAEMON_SPAWN_LOCK_WAIT_MS);
547    loop {
548        match file.try_lock_exclusive() {
549            Ok(()) => return Ok(Some(file)),
550            Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
551                if Instant::now() >= deadline {
552                    return Ok(None);
553                }
554                thread::sleep(Duration::from_millis(50));
555            }
556            Err(err) => return Err(AppError::Io(err)),
557        }
558    }
559}
560
561fn spawn_backoff_active(models_dir: &Path) -> Result<bool, AppError> {
562    let state = load_spawn_state(models_dir)?;
563    Ok(now_epoch_ms() < state.not_before_epoch_ms)
564}
565
566fn record_spawn_failure(models_dir: &Path, message: String) -> Result<(), AppError> {
567    let mut state = load_spawn_state(models_dir)?;
568    state.consecutive_failures = state.consecutive_failures.saturating_add(1);
569    let exponent = state.consecutive_failures.saturating_sub(1).min(6);
570    let backoff_ms =
571        (DAEMON_SPAWN_BACKOFF_BASE_MS * (1_u64 << exponent)).min(DAEMON_AUTO_START_MAX_BACKOFF_MS);
572    state.not_before_epoch_ms = now_epoch_ms() + backoff_ms;
573    state.last_error = Some(message);
574    save_spawn_state(models_dir, &state)
575}
576
577fn clear_spawn_backoff_state(models_dir: &Path) -> Result<(), AppError> {
578    let path = spawn_state_path(models_dir);
579    if path.exists() {
580        std::fs::remove_file(path).map_err(AppError::Io)?;
581    }
582    Ok(())
583}
584
585fn load_spawn_state(models_dir: &Path) -> Result<DaemonSpawnState, AppError> {
586    let path = spawn_state_path(models_dir);
587    if !path.exists() {
588        return Ok(DaemonSpawnState::default());
589    }
590
591    let bytes = std::fs::read(path).map_err(AppError::Io)?;
592    serde_json::from_slice(&bytes).map_err(AppError::Json)
593}
594
595fn save_spawn_state(models_dir: &Path, state: &DaemonSpawnState) -> Result<(), AppError> {
596    let path = spawn_state_path(models_dir);
597    std::fs::create_dir_all(crate::paths::parent_or_err(&path)?).map_err(AppError::Io)?;
598    let bytes = serde_json::to_vec(state).map_err(AppError::Json)?;
599    std::fs::write(path, bytes).map_err(AppError::Io)
600}
601
602fn now_epoch_ms() -> u64 {
603    SystemTime::now()
604        .duration_since(UNIX_EPOCH)
605        .unwrap_or_else(|_| Duration::from_secs(0))
606        .as_millis() as u64
607}
608
609fn to_local_socket_name(name: &str) -> std::io::Result<interprocess::local_socket::Name<'static>> {
610    if let Ok(ns_name) = name.to_string().to_ns_name::<GenericNamespaced>() {
611        return Ok(ns_name);
612    }
613
614    let path = if cfg!(unix) {
615        format!("/tmp/{name}.sock")
616    } else {
617        format!(r"\\.\pipe\{name}")
618    };
619    path.to_fs_name::<GenericFilePath>()
620}
621
622#[cfg(test)]
623mod tests {
624    use super::*;
625
626    #[test]
627    fn record_and_clear_spawn_backoff_state() {
628        let tmp = tempfile::tempdir().unwrap();
629        let models_dir = tmp.path().join("cache").join("models");
630        std::fs::create_dir_all(&models_dir).unwrap();
631
632        assert!(!spawn_backoff_active(&models_dir).unwrap());
633
634        record_spawn_failure(&models_dir, "spawn failed".to_string()).unwrap();
635        assert!(spawn_backoff_active(&models_dir).unwrap());
636
637        let state = load_spawn_state(&models_dir).unwrap();
638        assert_eq!(state.consecutive_failures, 1);
639        assert_eq!(state.last_error.as_deref(), Some("spawn failed"));
640
641        clear_spawn_backoff_state(&models_dir).unwrap();
642        assert!(!spawn_backoff_active(&models_dir).unwrap());
643    }
644
645    #[test]
646    fn daemon_control_dir_usa_pai_de_models() {
647        let base = PathBuf::from("/tmp/sqlite-graphrag-cache-test");
648        let models_dir = base.join("models");
649        assert_eq!(daemon_control_dir(&models_dir), base);
650    }
651}