Skip to main content

sqlite_graphrag/
daemon.rs

1use crate::constants::{
2    DAEMON_AUTO_START_INITIAL_BACKOFF_MS, DAEMON_AUTO_START_MAX_BACKOFF_MS,
3    DAEMON_AUTO_START_MAX_WAIT_MS, DAEMON_IDLE_SHUTDOWN_SECS, DAEMON_PING_TIMEOUT_MS,
4    DAEMON_SPAWN_BACKOFF_BASE_MS, DAEMON_SPAWN_LOCK_WAIT_MS, SQLITE_GRAPHRAG_VERSION,
5};
6use crate::errors::AppError;
7use crate::{embedder, shutdown_requested};
8use fs4::fs_std::FileExt;
9use interprocess::local_socket::{
10    prelude::LocalSocketStream,
11    traits::{Listener as _, Stream as _},
12    GenericFilePath, GenericNamespaced, ListenerNonblockingMode, ListenerOptions, ToFsName,
13    ToNsName,
14};
15use serde::{Deserialize, Serialize};
16use std::fs::{File, OpenOptions};
17use std::io::{BufRead, BufReader, Write};
18use std::path::{Path, PathBuf};
19use std::process::Stdio;
20use std::thread;
21use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
22
23#[derive(Debug, Serialize, Deserialize)]
24#[serde(tag = "request", rename_all = "snake_case")]
25pub enum DaemonRequest {
26    Ping,
27    Shutdown,
28    EmbedPassage {
29        text: String,
30    },
31    EmbedQuery {
32        text: String,
33    },
34    EmbedPassages {
35        texts: Vec<String>,
36        token_counts: Vec<usize>,
37    },
38}
39
40#[derive(Debug, Serialize, Deserialize)]
41#[serde(tag = "status", rename_all = "snake_case")]
42pub enum DaemonResponse {
43    Listening {
44        pid: u32,
45        socket: String,
46        idle_shutdown_secs: u64,
47    },
48    Ok {
49        pid: u32,
50        version: String,
51        handled_embed_requests: u64,
52    },
53    PassageEmbedding {
54        embedding: Vec<f32>,
55        handled_embed_requests: u64,
56    },
57    QueryEmbedding {
58        embedding: Vec<f32>,
59        handled_embed_requests: u64,
60    },
61    PassageEmbeddings {
62        embeddings: Vec<Vec<f32>>,
63        handled_embed_requests: u64,
64    },
65    ShuttingDown {
66        handled_embed_requests: u64,
67    },
68    Error {
69        message: String,
70    },
71}
72
73#[derive(Debug, Default, Serialize, Deserialize)]
74struct DaemonSpawnState {
75    consecutive_failures: u32,
76    not_before_epoch_ms: u64,
77    last_error: Option<String>,
78}
79
80pub fn daemon_label(models_dir: &Path) -> String {
81    let hash = blake3::hash(models_dir.to_string_lossy().as_bytes())
82        .to_hex()
83        .to_string();
84    format!("sqlite-graphrag-daemon-{}", &hash[..16])
85}
86
87pub fn try_ping(models_dir: &Path) -> Result<Option<DaemonResponse>, AppError> {
88    request_if_available(models_dir, &DaemonRequest::Ping)
89}
90
91pub fn try_shutdown(models_dir: &Path) -> Result<Option<DaemonResponse>, AppError> {
92    request_if_available(models_dir, &DaemonRequest::Shutdown)
93}
94
95pub fn embed_passage_or_local(models_dir: &Path, text: &str) -> Result<Vec<f32>, AppError> {
96    match request_or_autostart(
97        models_dir,
98        &DaemonRequest::EmbedPassage {
99            text: text.to_string(),
100        },
101    )? {
102        Some(DaemonResponse::PassageEmbedding { embedding, .. }) => Ok(embedding),
103        Some(DaemonResponse::Error { message }) => Err(AppError::Embedding(message)),
104        Some(other) => Err(AppError::Internal(anyhow::anyhow!(
105            "unexpected daemon response for passage embedding: {other:?}"
106        ))),
107        None => {
108            let embedder = embedder::get_embedder(models_dir)?;
109            embedder::embed_passage(embedder, text)
110        }
111    }
112}
113
114pub fn embed_query_or_local(models_dir: &Path, text: &str) -> Result<Vec<f32>, AppError> {
115    match request_or_autostart(
116        models_dir,
117        &DaemonRequest::EmbedQuery {
118            text: text.to_string(),
119        },
120    )? {
121        Some(DaemonResponse::QueryEmbedding { embedding, .. }) => Ok(embedding),
122        Some(DaemonResponse::Error { message }) => Err(AppError::Embedding(message)),
123        Some(other) => Err(AppError::Internal(anyhow::anyhow!(
124            "unexpected daemon response for query embedding: {other:?}"
125        ))),
126        None => {
127            let embedder = embedder::get_embedder(models_dir)?;
128            embedder::embed_query(embedder, text)
129        }
130    }
131}
132
133pub fn embed_passages_controlled_or_local(
134    models_dir: &Path,
135    texts: &[&str],
136    token_counts: &[usize],
137) -> Result<Vec<Vec<f32>>, AppError> {
138    let request = DaemonRequest::EmbedPassages {
139        texts: texts.iter().map(|t| (*t).to_string()).collect(),
140        token_counts: token_counts.to_vec(),
141    };
142
143    match request_or_autostart(models_dir, &request)? {
144        Some(DaemonResponse::PassageEmbeddings { embeddings, .. }) => Ok(embeddings),
145        Some(DaemonResponse::Error { message }) => Err(AppError::Embedding(message)),
146        Some(other) => Err(AppError::Internal(anyhow::anyhow!(
147            "unexpected daemon response for batch passage embeddings: {other:?}"
148        ))),
149        None => {
150            let embedder = embedder::get_embedder(models_dir)?;
151            embedder::embed_passages_controlled(embedder, texts, token_counts)
152        }
153    }
154}
155
156pub fn run(models_dir: &Path, idle_shutdown_secs: u64) -> Result<(), AppError> {
157    let socket = daemon_label(models_dir);
158    let name = to_local_socket_name(&socket)?;
159    let listener = ListenerOptions::new()
160        .name(name)
161        .nonblocking(ListenerNonblockingMode::Accept)
162        .try_overwrite(true)
163        .create_sync()
164        .map_err(AppError::Io)?;
165
166    // Warm the model once per daemon process.
167    let _ = embedder::get_embedder(models_dir)?;
168
169    crate::output::emit_json(&DaemonResponse::Listening {
170        pid: std::process::id(),
171        socket,
172        idle_shutdown_secs,
173    })?;
174
175    let mut handled_embed_requests = 0_u64;
176    let mut last_activity = Instant::now();
177
178    loop {
179        if shutdown_requested() {
180            break;
181        }
182
183        if !daemon_control_dir(models_dir).exists() {
184            tracing::info!("daemon control directory disappeared; shutting down");
185            break;
186        }
187
188        match listener.accept() {
189            Ok(stream) => {
190                last_activity = Instant::now();
191                let should_exit = handle_client(stream, models_dir, &mut handled_embed_requests)?;
192                if should_exit {
193                    break;
194                }
195            }
196            Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
197                if last_activity.elapsed() >= Duration::from_secs(idle_shutdown_secs) {
198                    tracing::info!(
199                        idle_shutdown_secs,
200                        handled_embed_requests,
201                        "daemon idle timeout reached"
202                    );
203                    break;
204                }
205                thread::sleep(Duration::from_millis(50));
206            }
207            Err(err) => return Err(AppError::Io(err)),
208        }
209    }
210
211    Ok(())
212}
213
214fn handle_client(
215    stream: LocalSocketStream,
216    models_dir: &Path,
217    handled_embed_requests: &mut u64,
218) -> Result<bool, AppError> {
219    let mut reader = BufReader::new(stream);
220    let mut line = String::new();
221    reader.read_line(&mut line).map_err(AppError::Io)?;
222
223    if line.trim().is_empty() {
224        write_response(
225            reader.get_mut(),
226            &DaemonResponse::Error {
227                message: "empty daemon request".to_string(),
228            },
229        )?;
230        return Ok(false);
231    }
232
233    let request: DaemonRequest = serde_json::from_str(line.trim()).map_err(AppError::Json)?;
234    let (response, should_exit) = match request {
235        DaemonRequest::Ping => (
236            DaemonResponse::Ok {
237                pid: std::process::id(),
238                version: SQLITE_GRAPHRAG_VERSION.to_string(),
239                handled_embed_requests: *handled_embed_requests,
240            },
241            false,
242        ),
243        DaemonRequest::Shutdown => (
244            DaemonResponse::ShuttingDown {
245                handled_embed_requests: *handled_embed_requests,
246            },
247            true,
248        ),
249        DaemonRequest::EmbedPassage { text } => {
250            let embedder = embedder::get_embedder(models_dir)?;
251            let embedding = embedder::embed_passage(embedder, &text)?;
252            *handled_embed_requests += 1;
253            (
254                DaemonResponse::PassageEmbedding {
255                    embedding,
256                    handled_embed_requests: *handled_embed_requests,
257                },
258                false,
259            )
260        }
261        DaemonRequest::EmbedQuery { text } => {
262            let embedder = embedder::get_embedder(models_dir)?;
263            let embedding = embedder::embed_query(embedder, &text)?;
264            *handled_embed_requests += 1;
265            (
266                DaemonResponse::QueryEmbedding {
267                    embedding,
268                    handled_embed_requests: *handled_embed_requests,
269                },
270                false,
271            )
272        }
273        DaemonRequest::EmbedPassages {
274            texts,
275            token_counts,
276        } => {
277            let embedder = embedder::get_embedder(models_dir)?;
278            let text_refs: Vec<&str> = texts.iter().map(String::as_str).collect();
279            let embeddings =
280                embedder::embed_passages_controlled(embedder, &text_refs, &token_counts)?;
281            *handled_embed_requests += 1;
282            (
283                DaemonResponse::PassageEmbeddings {
284                    embeddings,
285                    handled_embed_requests: *handled_embed_requests,
286                },
287                false,
288            )
289        }
290    };
291
292    write_response(reader.get_mut(), &response)?;
293    Ok(should_exit)
294}
295
296fn write_response(
297    stream: &mut LocalSocketStream,
298    response: &DaemonResponse,
299) -> Result<(), AppError> {
300    serde_json::to_writer(&mut *stream, response).map_err(AppError::Json)?;
301    stream.write_all(b"\n").map_err(AppError::Io)?;
302    stream.flush().map_err(AppError::Io)?;
303    Ok(())
304}
305
306fn request_if_available(
307    models_dir: &Path,
308    request: &DaemonRequest,
309) -> Result<Option<DaemonResponse>, AppError> {
310    let socket = daemon_label(models_dir);
311    let name = match to_local_socket_name(&socket) {
312        Ok(name) => name,
313        Err(err) => return Err(AppError::Io(err)),
314    };
315
316    let mut stream = match LocalSocketStream::connect(name) {
317        Ok(stream) => stream,
318        Err(err)
319            if matches!(
320                err.kind(),
321                std::io::ErrorKind::NotFound
322                    | std::io::ErrorKind::ConnectionRefused
323                    | std::io::ErrorKind::AddrNotAvailable
324                    | std::io::ErrorKind::TimedOut
325            ) =>
326        {
327            return Ok(None);
328        }
329        Err(err) => return Err(AppError::Io(err)),
330    };
331
332    serde_json::to_writer(&mut stream, request).map_err(AppError::Json)?;
333    stream.write_all(b"\n").map_err(AppError::Io)?;
334    stream.flush().map_err(AppError::Io)?;
335
336    let mut reader = BufReader::new(stream);
337    let mut line = String::new();
338    reader.read_line(&mut line).map_err(AppError::Io)?;
339    if line.trim().is_empty() {
340        return Err(AppError::Embedding("daemon returned empty response".into()));
341    }
342
343    let response = serde_json::from_str(line.trim()).map_err(AppError::Json)?;
344    Ok(Some(response))
345}
346
347fn request_or_autostart(
348    models_dir: &Path,
349    request: &DaemonRequest,
350) -> Result<Option<DaemonResponse>, AppError> {
351    if let Some(response) = request_if_available(models_dir, request)? {
352        clear_spawn_backoff_state(models_dir).ok();
353        return Ok(Some(response));
354    }
355
356    if autostart_disabled() {
357        return Ok(None);
358    }
359
360    if !ensure_daemon_running(models_dir)? {
361        return Ok(None);
362    }
363
364    request_if_available(models_dir, request)
365}
366
367fn ensure_daemon_running(models_dir: &Path) -> Result<bool, AppError> {
368    if (try_ping(models_dir)?).is_some() {
369        clear_spawn_backoff_state(models_dir).ok();
370        return Ok(true);
371    }
372
373    if spawn_backoff_active(models_dir)? {
374        tracing::warn!("daemon autostart suppressed by backoff window");
375        return Ok(false);
376    }
377
378    let spawn_lock = match try_acquire_spawn_lock(models_dir)? {
379        Some(lock) => lock,
380        None => return wait_for_daemon_ready(models_dir),
381    };
382
383    if (try_ping(models_dir)?).is_some() {
384        clear_spawn_backoff_state(models_dir).ok();
385        drop(spawn_lock);
386        return Ok(true);
387    }
388
389    let exe = match std::env::current_exe() {
390        Ok(path) => path,
391        Err(err) => {
392            record_spawn_failure(models_dir, format!("current_exe failed: {err}"))?;
393            drop(spawn_lock);
394            return Ok(false);
395        }
396    };
397
398    let mut child = std::process::Command::new(exe);
399    child
400        .arg("daemon")
401        .arg("--idle-shutdown-secs")
402        .arg(DAEMON_IDLE_SHUTDOWN_SECS.to_string())
403        .env("SQLITE_GRAPHRAG_DAEMON_CHILD", "1")
404        .stdin(Stdio::null())
405        .stdout(Stdio::null())
406        .stderr(Stdio::null());
407
408    match child.spawn() {
409        Ok(_) => {
410            let ready = wait_for_daemon_ready(models_dir)?;
411            if ready {
412                clear_spawn_backoff_state(models_dir).ok();
413            } else {
414                record_spawn_failure(
415                    models_dir,
416                    "daemon did not become healthy after autostart".to_string(),
417                )?;
418            }
419            drop(spawn_lock);
420            Ok(ready)
421        }
422        Err(err) => {
423            record_spawn_failure(models_dir, format!("daemon spawn failed: {err}"))?;
424            drop(spawn_lock);
425            Ok(false)
426        }
427    }
428}
429
430fn wait_for_daemon_ready(models_dir: &Path) -> Result<bool, AppError> {
431    let deadline = Instant::now() + Duration::from_millis(DAEMON_AUTO_START_MAX_WAIT_MS);
432    let mut sleep_ms = DAEMON_AUTO_START_INITIAL_BACKOFF_MS.max(DAEMON_PING_TIMEOUT_MS);
433
434    while Instant::now() < deadline {
435        if (try_ping(models_dir)?).is_some() {
436            return Ok(true);
437        }
438        thread::sleep(Duration::from_millis(sleep_ms));
439        sleep_ms = (sleep_ms * 2).min(DAEMON_AUTO_START_MAX_BACKOFF_MS);
440    }
441
442    Ok(false)
443}
444
445fn autostart_disabled() -> bool {
446    std::env::var("SQLITE_GRAPHRAG_DAEMON_CHILD").as_deref() == Ok("1")
447        || std::env::var("SQLITE_GRAPHRAG_DAEMON_FORCE_AUTOSTART").as_deref() != Ok("1")
448            && std::env::var("SQLITE_GRAPHRAG_DAEMON_DISABLE_AUTOSTART").as_deref() == Ok("1")
449}
450
451fn daemon_control_dir(models_dir: &Path) -> PathBuf {
452    models_dir
453        .parent()
454        .map(Path::to_path_buf)
455        .unwrap_or_else(|| models_dir.to_path_buf())
456}
457
458fn spawn_lock_path(models_dir: &Path) -> PathBuf {
459    daemon_control_dir(models_dir).join("daemon-spawn.lock")
460}
461
462fn spawn_state_path(models_dir: &Path) -> PathBuf {
463    daemon_control_dir(models_dir).join("daemon-spawn-state.json")
464}
465
466fn try_acquire_spawn_lock(models_dir: &Path) -> Result<Option<File>, AppError> {
467    let path = spawn_lock_path(models_dir);
468    std::fs::create_dir_all(path.parent().unwrap()).map_err(AppError::Io)?;
469    let file = OpenOptions::new()
470        .read(true)
471        .write(true)
472        .create(true)
473        .truncate(false)
474        .open(path)
475        .map_err(AppError::Io)?;
476
477    let deadline = Instant::now() + Duration::from_millis(DAEMON_SPAWN_LOCK_WAIT_MS);
478    loop {
479        match file.try_lock_exclusive() {
480            Ok(()) => return Ok(Some(file)),
481            Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
482                if Instant::now() >= deadline {
483                    return Ok(None);
484                }
485                thread::sleep(Duration::from_millis(50));
486            }
487            Err(err) => return Err(AppError::Io(err)),
488        }
489    }
490}
491
492fn spawn_backoff_active(models_dir: &Path) -> Result<bool, AppError> {
493    let state = load_spawn_state(models_dir)?;
494    Ok(now_epoch_ms() < state.not_before_epoch_ms)
495}
496
497fn record_spawn_failure(models_dir: &Path, message: String) -> Result<(), AppError> {
498    let mut state = load_spawn_state(models_dir)?;
499    state.consecutive_failures = state.consecutive_failures.saturating_add(1);
500    let exponent = state.consecutive_failures.saturating_sub(1).min(6);
501    let backoff_ms =
502        (DAEMON_SPAWN_BACKOFF_BASE_MS * (1_u64 << exponent)).min(DAEMON_AUTO_START_MAX_BACKOFF_MS);
503    state.not_before_epoch_ms = now_epoch_ms() + backoff_ms;
504    state.last_error = Some(message);
505    save_spawn_state(models_dir, &state)
506}
507
508fn clear_spawn_backoff_state(models_dir: &Path) -> Result<(), AppError> {
509    let path = spawn_state_path(models_dir);
510    if path.exists() {
511        std::fs::remove_file(path).map_err(AppError::Io)?;
512    }
513    Ok(())
514}
515
516fn load_spawn_state(models_dir: &Path) -> Result<DaemonSpawnState, AppError> {
517    let path = spawn_state_path(models_dir);
518    if !path.exists() {
519        return Ok(DaemonSpawnState::default());
520    }
521
522    let bytes = std::fs::read(path).map_err(AppError::Io)?;
523    serde_json::from_slice(&bytes).map_err(AppError::Json)
524}
525
526fn save_spawn_state(models_dir: &Path, state: &DaemonSpawnState) -> Result<(), AppError> {
527    let path = spawn_state_path(models_dir);
528    std::fs::create_dir_all(path.parent().unwrap()).map_err(AppError::Io)?;
529    let bytes = serde_json::to_vec(state).map_err(AppError::Json)?;
530    std::fs::write(path, bytes).map_err(AppError::Io)
531}
532
533fn now_epoch_ms() -> u64 {
534    SystemTime::now()
535        .duration_since(UNIX_EPOCH)
536        .unwrap_or_else(|_| Duration::from_secs(0))
537        .as_millis() as u64
538}
539
540fn to_local_socket_name(name: &str) -> std::io::Result<interprocess::local_socket::Name<'static>> {
541    if let Ok(ns_name) = name.to_string().to_ns_name::<GenericNamespaced>() {
542        return Ok(ns_name);
543    }
544
545    let path = if cfg!(unix) {
546        format!("/tmp/{name}.sock")
547    } else {
548        format!(r"\\.\pipe\{name}")
549    };
550    path.to_fs_name::<GenericFilePath>()
551}
552
553#[cfg(test)]
554mod tests {
555    use super::*;
556
557    #[test]
558    fn record_and_clear_spawn_backoff_state() {
559        let tmp = tempfile::tempdir().unwrap();
560        let models_dir = tmp.path().join("cache").join("models");
561        std::fs::create_dir_all(&models_dir).unwrap();
562
563        assert!(!spawn_backoff_active(&models_dir).unwrap());
564
565        record_spawn_failure(&models_dir, "spawn failed".to_string()).unwrap();
566        assert!(spawn_backoff_active(&models_dir).unwrap());
567
568        let state = load_spawn_state(&models_dir).unwrap();
569        assert_eq!(state.consecutive_failures, 1);
570        assert_eq!(state.last_error.as_deref(), Some("spawn failed"));
571
572        clear_spawn_backoff_state(&models_dir).unwrap();
573        assert!(!spawn_backoff_active(&models_dir).unwrap());
574    }
575
576    #[test]
577    fn daemon_control_dir_usa_pai_de_models() {
578        let base = PathBuf::from("/tmp/sqlite-graphrag-cache-test");
579        let models_dir = base.join("models");
580        assert_eq!(daemon_control_dir(&models_dir), base);
581    }
582}