1use crate::constants::{
7 DAEMON_AUTO_START_INITIAL_BACKOFF_MS, DAEMON_AUTO_START_MAX_BACKOFF_MS,
8 DAEMON_AUTO_START_MAX_WAIT_MS, DAEMON_IDLE_SHUTDOWN_SECS, DAEMON_PING_TIMEOUT_MS,
9 DAEMON_SPAWN_BACKOFF_BASE_MS, DAEMON_SPAWN_LOCK_WAIT_MS, SQLITE_GRAPHRAG_VERSION,
10};
11use crate::errors::AppError;
12use crate::{embedder, shutdown_requested};
13use fs4::fs_std::FileExt;
14use interprocess::local_socket::{
15 prelude::LocalSocketStream,
16 traits::{Listener as _, Stream as _},
17 GenericFilePath, GenericNamespaced, ListenerNonblockingMode, ListenerOptions, ToFsName,
18 ToNsName,
19};
20use serde::{Deserialize, Serialize};
21use std::fs::{File, OpenOptions};
22use std::io::{BufRead, BufReader, Write};
23use std::path::{Path, PathBuf};
24use std::process::Stdio;
25use std::sync::atomic::{AtomicU64, Ordering};
26use std::sync::Arc;
27use std::thread;
28use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
29
30#[derive(Debug, Serialize, Deserialize)]
31#[serde(tag = "request", rename_all = "snake_case")]
32pub enum DaemonRequest {
33 Ping,
34 Shutdown,
35 EmbedPassage {
36 text: String,
37 },
38 EmbedQuery {
39 text: String,
40 },
41 EmbedPassages {
42 texts: Vec<String>,
43 token_counts: Vec<usize>,
44 },
45}
46
47#[derive(Debug, Serialize, Deserialize)]
48#[serde(tag = "status", rename_all = "snake_case")]
49pub enum DaemonResponse {
50 Listening {
51 pid: u32,
52 socket: String,
53 idle_shutdown_secs: u64,
54 },
55 Ok {
56 pid: u32,
57 version: String,
58 handled_embed_requests: u64,
59 },
60 PassageEmbedding {
61 embedding: Vec<f32>,
62 handled_embed_requests: u64,
63 },
64 QueryEmbedding {
65 embedding: Vec<f32>,
66 handled_embed_requests: u64,
67 },
68 PassageEmbeddings {
69 embeddings: Vec<Vec<f32>>,
70 handled_embed_requests: u64,
71 },
72 ShuttingDown {
73 handled_embed_requests: u64,
74 },
75 Error {
76 message: String,
77 },
78}
79
80#[derive(Debug, Default, Serialize, Deserialize)]
81struct DaemonSpawnState {
82 consecutive_failures: u32,
83 not_before_epoch_ms: u64,
84 last_error: Option<String>,
85}
86
87pub fn daemon_label(models_dir: &Path) -> String {
88 let hash = blake3::hash(models_dir.to_string_lossy().as_bytes())
89 .to_hex()
90 .to_string();
91 format!("sqlite-graphrag-daemon-{}", &hash[..16])
92}
93
94pub fn try_ping(models_dir: &Path) -> Result<Option<DaemonResponse>, AppError> {
95 request_if_available(models_dir, &DaemonRequest::Ping)
96}
97
98pub fn try_shutdown(models_dir: &Path) -> Result<Option<DaemonResponse>, AppError> {
99 request_if_available(models_dir, &DaemonRequest::Shutdown)
100}
101
102pub fn embed_passage_or_local(models_dir: &Path, text: &str) -> Result<Vec<f32>, AppError> {
103 match request_or_autostart(
104 models_dir,
105 &DaemonRequest::EmbedPassage {
106 text: text.to_string(),
107 },
108 true,
109 )? {
110 Some(DaemonResponse::PassageEmbedding { embedding, .. }) => Ok(embedding),
111 Some(DaemonResponse::Error { message }) => Err(AppError::Embedding(message)),
112 Some(other) => Err(AppError::Internal(anyhow::anyhow!(
113 "unexpected daemon response for passage embedding: {other:?}"
114 ))),
115 None => {
116 let embedder = embedder::get_embedder(models_dir)?;
117 embedder::embed_passage(embedder, text)
118 }
119 }
120}
121
122pub fn embed_query_or_local(
123 models_dir: &Path,
124 text: &str,
125 cli_autostart: bool,
126) -> Result<Vec<f32>, AppError> {
127 match request_or_autostart(
128 models_dir,
129 &DaemonRequest::EmbedQuery {
130 text: text.to_string(),
131 },
132 cli_autostart,
133 )? {
134 Some(DaemonResponse::QueryEmbedding { embedding, .. }) => Ok(embedding),
135 Some(DaemonResponse::Error { message }) => Err(AppError::Embedding(message)),
136 Some(other) => Err(AppError::Internal(anyhow::anyhow!(
137 "unexpected daemon response for query embedding: {other:?}"
138 ))),
139 None => {
140 let embedder = embedder::get_embedder(models_dir)?;
141 embedder::embed_query(embedder, text)
142 }
143 }
144}
145
146pub fn embed_passages_controlled_or_local(
147 models_dir: &Path,
148 texts: &[&str],
149 token_counts: &[usize],
150) -> Result<Vec<Vec<f32>>, AppError> {
151 let request = DaemonRequest::EmbedPassages {
152 texts: texts.iter().map(|t| (*t).to_string()).collect(),
153 token_counts: token_counts.to_vec(),
154 };
155
156 match request_or_autostart(models_dir, &request, true)? {
157 Some(DaemonResponse::PassageEmbeddings { embeddings, .. }) => Ok(embeddings),
158 Some(DaemonResponse::Error { message }) => Err(AppError::Embedding(message)),
159 Some(other) => Err(AppError::Internal(anyhow::anyhow!(
160 "unexpected daemon response for passage embedding batch: {other:?}"
161 ))),
162 None => {
163 let embedder = embedder::get_embedder(models_dir)?;
164 embedder::embed_passages_controlled(embedder, texts, token_counts)
165 }
166 }
167}
168
169struct DaemonSpawnGuard {
170 models_dir: PathBuf,
171}
172
173impl DaemonSpawnGuard {
174 fn new(models_dir: &Path) -> Self {
175 Self {
176 models_dir: models_dir.to_path_buf(),
177 }
178 }
179}
180
181impl Drop for DaemonSpawnGuard {
182 fn drop(&mut self) {
183 let lock_path = spawn_lock_path(&self.models_dir);
184 if lock_path.exists() {
185 match std::fs::remove_file(&lock_path) {
186 Ok(()) => {
187 tracing::debug!(
188 path = %lock_path.display(),
189 "spawn lock file removed during graceful daemon shutdown"
190 );
191 }
192 Err(err) if err.kind() == std::io::ErrorKind::NotFound => {}
193 Err(err) => {
194 tracing::warn!(
195 error = %err,
196 path = %lock_path.display(),
197 "failed to remove spawn lock file while shutting down daemon"
198 );
199 }
200 }
201 }
202 tracing::info!(
203 "daemon shut down gracefully; socket will be cleaned up by OS or by the next daemon via try_overwrite"
204 );
205 }
206}
207
208pub fn run(models_dir: &Path, idle_shutdown_secs: u64) -> Result<(), AppError> {
209 let rt = tokio::runtime::Builder::new_multi_thread()
213 .worker_threads(2)
214 .thread_name("daemon-worker")
215 .enable_all()
216 .build()
217 .map_err(AppError::Io)?;
218
219 rt.block_on(run_async(models_dir, idle_shutdown_secs))
220}
221
222async fn run_async(models_dir: &Path, idle_shutdown_secs: u64) -> Result<(), AppError> {
223 let socket = daemon_label(models_dir);
224 let name = to_local_socket_name(&socket)?;
225 let listener = ListenerOptions::new()
226 .name(name)
227 .nonblocking(ListenerNonblockingMode::Accept)
228 .try_overwrite(true)
229 .create_sync()
230 .map_err(AppError::Io)?;
231
232 let _spawn_guard = DaemonSpawnGuard::new(models_dir);
235
236 let models_dir_warm = models_dir.to_path_buf();
240 tokio::task::spawn_blocking(move || embedder::get_embedder(&models_dir_warm).map(|_| ()))
241 .await
242 .map_err(|e| AppError::Internal(anyhow::anyhow!("model warm-up panicked: {e}")))??;
243
244 crate::output::emit_json(&DaemonResponse::Listening {
245 pid: std::process::id(),
246 socket,
247 idle_shutdown_secs,
248 })?;
249
250 let handled_embed_requests = Arc::new(AtomicU64::new(0));
251 let mut last_activity = Instant::now();
252 let models_dir = models_dir.to_path_buf();
253
254 loop {
255 if shutdown_requested() {
256 break;
257 }
258
259 if !daemon_control_dir(&models_dir).exists() {
260 tracing::info!("daemon control directory disappeared; shutting down");
261 break;
262 }
263
264 match listener.accept() {
265 Ok(stream) => {
266 last_activity = Instant::now();
267 let models_dir_clone = models_dir.clone();
268 let counter = Arc::clone(&handled_embed_requests);
269 let should_exit = tokio::task::spawn_blocking(move || {
270 handle_client(stream, &models_dir_clone, &counter)
271 })
272 .await
273 .map_err(|e| {
274 AppError::Internal(anyhow::anyhow!("spawn_blocking panicked: {e}"))
275 })??;
276
277 if should_exit {
278 break;
279 }
280 }
281 Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
282 if last_activity.elapsed() >= Duration::from_secs(idle_shutdown_secs) {
283 tracing::info!(
284 idle_shutdown_secs,
285 handled_embed_requests = handled_embed_requests.load(Ordering::Relaxed),
286 "daemon idle timeout reached"
287 );
288 break;
289 }
290 tokio::time::sleep(Duration::from_millis(50)).await;
291 }
292 Err(err) => return Err(AppError::Io(err)),
293 }
294 }
295
296 Ok(())
297}
298
299fn handle_client(
300 stream: LocalSocketStream,
301 models_dir: &Path,
302 handled_embed_requests: &AtomicU64,
303) -> Result<bool, AppError> {
304 let mut reader = BufReader::new(stream);
305 let mut line = String::new();
306 reader.read_line(&mut line).map_err(AppError::Io)?;
307
308 if line.trim().is_empty() {
309 write_response(
310 reader.get_mut(),
311 &DaemonResponse::Error {
312 message: "empty request to daemon".to_string(),
313 },
314 )?;
315 return Ok(false);
316 }
317
318 let request: DaemonRequest = serde_json::from_str(line.trim()).map_err(AppError::Json)?;
319 let (response, should_exit) = match request {
320 DaemonRequest::Ping => (
321 DaemonResponse::Ok {
322 pid: std::process::id(),
323 version: SQLITE_GRAPHRAG_VERSION.to_string(),
324 handled_embed_requests: handled_embed_requests.load(Ordering::Relaxed),
325 },
326 false,
327 ),
328 DaemonRequest::Shutdown => (
329 DaemonResponse::ShuttingDown {
330 handled_embed_requests: handled_embed_requests.load(Ordering::Relaxed),
331 },
332 true,
333 ),
334 DaemonRequest::EmbedPassage { text } => {
335 let embedder = embedder::get_embedder(models_dir)?;
336 let embedding = embedder::embed_passage(embedder, &text)?;
337 let count = handled_embed_requests.fetch_add(1, Ordering::Relaxed) + 1;
338 (
339 DaemonResponse::PassageEmbedding {
340 embedding,
341 handled_embed_requests: count,
342 },
343 false,
344 )
345 }
346 DaemonRequest::EmbedQuery { text } => {
347 let embedder = embedder::get_embedder(models_dir)?;
348 let embedding = embedder::embed_query(embedder, &text)?;
349 let count = handled_embed_requests.fetch_add(1, Ordering::Relaxed) + 1;
350 (
351 DaemonResponse::QueryEmbedding {
352 embedding,
353 handled_embed_requests: count,
354 },
355 false,
356 )
357 }
358 DaemonRequest::EmbedPassages {
359 texts,
360 token_counts,
361 } => {
362 let embedder = embedder::get_embedder(models_dir)?;
363 let text_refs: Vec<&str> = texts.iter().map(String::as_str).collect();
364 let embeddings =
365 embedder::embed_passages_controlled(embedder, &text_refs, &token_counts)?;
366 let count = handled_embed_requests.fetch_add(1, Ordering::Relaxed) + 1;
367 (
368 DaemonResponse::PassageEmbeddings {
369 embeddings,
370 handled_embed_requests: count,
371 },
372 false,
373 )
374 }
375 };
376
377 write_response(reader.get_mut(), &response)?;
378 Ok(should_exit)
379}
380
381fn write_response(
382 stream: &mut LocalSocketStream,
383 response: &DaemonResponse,
384) -> Result<(), AppError> {
385 serde_json::to_writer(&mut *stream, response).map_err(AppError::Json)?;
386 stream.write_all(b"\n").map_err(AppError::Io)?;
387 stream.flush().map_err(AppError::Io)?;
388 Ok(())
389}
390
391fn request_if_available(
392 models_dir: &Path,
393 request: &DaemonRequest,
394) -> Result<Option<DaemonResponse>, AppError> {
395 let socket = daemon_label(models_dir);
396 let name = match to_local_socket_name(&socket) {
397 Ok(name) => name,
398 Err(err) => return Err(AppError::Io(err)),
399 };
400
401 let mut stream = match LocalSocketStream::connect(name) {
402 Ok(stream) => stream,
403 Err(err)
404 if matches!(
405 err.kind(),
406 std::io::ErrorKind::NotFound
407 | std::io::ErrorKind::ConnectionRefused
408 | std::io::ErrorKind::AddrNotAvailable
409 | std::io::ErrorKind::TimedOut
410 ) =>
411 {
412 return Ok(None);
413 }
414 Err(err) => return Err(AppError::Io(err)),
415 };
416
417 serde_json::to_writer(&mut stream, request).map_err(AppError::Json)?;
418 stream.write_all(b"\n").map_err(AppError::Io)?;
419 stream.flush().map_err(AppError::Io)?;
420
421 let mut reader = BufReader::new(stream);
422 let mut line = String::new();
423 reader.read_line(&mut line).map_err(AppError::Io)?;
424 if line.trim().is_empty() {
425 return Err(AppError::Embedding(
426 "daemon returned an empty response".into(),
427 ));
428 }
429
430 let response = serde_json::from_str(line.trim()).map_err(AppError::Json)?;
431 Ok(Some(response))
432}
433
434fn should_autostart(cli_flag: bool) -> bool {
435 if !cli_flag {
436 return false; }
438 !autostart_disabled_by_env()
439}
440
441fn request_or_autostart(
442 models_dir: &Path,
443 request: &DaemonRequest,
444 cli_autostart: bool,
445) -> Result<Option<DaemonResponse>, AppError> {
446 if let Some(response) = request_if_available(models_dir, request)? {
447 clear_spawn_backoff_state(models_dir).ok();
448 return Ok(Some(response));
449 }
450
451 if !should_autostart(cli_autostart) {
452 return Ok(None);
453 }
454
455 if !ensure_daemon_running(models_dir)? {
456 return Ok(None);
457 }
458
459 request_if_available(models_dir, request)
460}
461
462fn ensure_daemon_running(models_dir: &Path) -> Result<bool, AppError> {
463 if (try_ping(models_dir)?).is_some() {
464 clear_spawn_backoff_state(models_dir).ok();
465 return Ok(true);
466 }
467
468 if spawn_backoff_active(models_dir)? {
469 tracing::warn!("daemon autostart suppressed by backoff window");
470 return Ok(false);
471 }
472
473 let spawn_lock = match try_acquire_spawn_lock(models_dir)? {
474 Some(lock) => lock,
475 None => return wait_for_daemon_ready(models_dir),
476 };
477
478 if (try_ping(models_dir)?).is_some() {
479 clear_spawn_backoff_state(models_dir).ok();
480 drop(spawn_lock);
481 return Ok(true);
482 }
483
484 let exe = match std::env::current_exe() {
485 Ok(path) => path,
486 Err(err) => {
487 record_spawn_failure(models_dir, format!("current_exe failed: {err}"))?;
488 drop(spawn_lock);
489 return Ok(false);
490 }
491 };
492
493 let mut child = std::process::Command::new(exe);
494 child
495 .arg("daemon")
496 .arg("--idle-shutdown-secs")
497 .arg(DAEMON_IDLE_SHUTDOWN_SECS.to_string())
498 .env("SQLITE_GRAPHRAG_DAEMON_CHILD", "1")
499 .stdin(Stdio::null())
500 .stdout(Stdio::null())
501 .stderr(Stdio::null());
502
503 match child.spawn() {
504 Ok(child_handle) => {
505 let pid = child_handle.id();
515 drop(child_handle);
516 tracing::debug!(
517 pid,
518 "daemon detached; lifecycle managed via spawn lock + readiness file"
519 );
520 let ready = wait_for_daemon_ready(models_dir)?;
521 if ready {
522 clear_spawn_backoff_state(models_dir).ok();
523 } else {
524 record_spawn_failure(
525 models_dir,
526 "daemon did not become healthy after autostart".to_string(),
527 )?;
528 }
529 drop(spawn_lock);
530 Ok(ready)
531 }
532 Err(err) => {
533 record_spawn_failure(models_dir, format!("daemon spawn failed: {err}"))?;
534 drop(spawn_lock);
535 Ok(false)
536 }
537 }
538}
539
540fn wait_for_daemon_ready(models_dir: &Path) -> Result<bool, AppError> {
541 let deadline = Instant::now() + Duration::from_millis(DAEMON_AUTO_START_MAX_WAIT_MS);
542 let mut sleep_ms = DAEMON_AUTO_START_INITIAL_BACKOFF_MS.max(DAEMON_PING_TIMEOUT_MS);
543
544 while Instant::now() < deadline {
545 if (try_ping(models_dir)?).is_some() {
546 return Ok(true);
547 }
548 thread::sleep(Duration::from_millis(sleep_ms));
549 sleep_ms = (sleep_ms * 2).min(DAEMON_AUTO_START_MAX_BACKOFF_MS);
550 }
551
552 Ok(false)
553}
554
555fn autostart_disabled_by_env() -> bool {
556 std::env::var("SQLITE_GRAPHRAG_DAEMON_CHILD").as_deref() == Ok("1")
557 || std::env::var("SQLITE_GRAPHRAG_DAEMON_FORCE_AUTOSTART").as_deref() != Ok("1")
558 && std::env::var("SQLITE_GRAPHRAG_DAEMON_DISABLE_AUTOSTART").as_deref() == Ok("1")
559}
560
561fn daemon_control_dir(models_dir: &Path) -> PathBuf {
562 models_dir
563 .parent()
564 .map(Path::to_path_buf)
565 .unwrap_or_else(|| models_dir.to_path_buf())
566}
567
568fn spawn_lock_path(models_dir: &Path) -> PathBuf {
569 daemon_control_dir(models_dir).join("daemon-spawn.lock")
570}
571
572fn spawn_state_path(models_dir: &Path) -> PathBuf {
573 daemon_control_dir(models_dir).join("daemon-spawn-state.json")
574}
575
576fn try_acquire_spawn_lock(models_dir: &Path) -> Result<Option<File>, AppError> {
577 let path = spawn_lock_path(models_dir);
578 std::fs::create_dir_all(crate::paths::parent_or_err(&path)?).map_err(AppError::Io)?;
579 let file = OpenOptions::new()
580 .read(true)
581 .write(true)
582 .create(true)
583 .truncate(false)
584 .open(path)
585 .map_err(AppError::Io)?;
586
587 let deadline = Instant::now() + Duration::from_millis(DAEMON_SPAWN_LOCK_WAIT_MS);
588 loop {
589 match file.try_lock_exclusive() {
590 Ok(()) => return Ok(Some(file)),
591 Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
592 if Instant::now() >= deadline {
593 return Ok(None);
594 }
595 thread::sleep(Duration::from_millis(50));
596 }
597 Err(err) => return Err(AppError::Io(err)),
598 }
599 }
600}
601
602fn spawn_backoff_active(models_dir: &Path) -> Result<bool, AppError> {
603 let state = load_spawn_state(models_dir)?;
604 Ok(now_epoch_ms() < state.not_before_epoch_ms)
605}
606
607fn record_spawn_failure(models_dir: &Path, message: String) -> Result<(), AppError> {
608 let mut state = load_spawn_state(models_dir)?;
609 state.consecutive_failures = state.consecutive_failures.saturating_add(1);
610 let exponent = state.consecutive_failures.saturating_sub(1).min(6);
611 let base_ms =
612 (DAEMON_SPAWN_BACKOFF_BASE_MS * (1_u64 << exponent)).min(DAEMON_AUTO_START_MAX_BACKOFF_MS);
613 let half = base_ms / 2;
618 let jitter_seed = SystemTime::now()
619 .duration_since(UNIX_EPOCH)
620 .map(|d| d.subsec_nanos() as u64)
621 .unwrap_or(0);
622 let jitter = if half == 0 { 0 } else { jitter_seed % half };
623 let backoff_ms = half + jitter;
624 state.not_before_epoch_ms = now_epoch_ms() + backoff_ms;
625 state.last_error = Some(message);
626 save_spawn_state(models_dir, &state)
627}
628
629fn clear_spawn_backoff_state(models_dir: &Path) -> Result<(), AppError> {
630 let path = spawn_state_path(models_dir);
631 if path.exists() {
632 std::fs::remove_file(path).map_err(AppError::Io)?;
633 }
634 Ok(())
635}
636
637fn load_spawn_state(models_dir: &Path) -> Result<DaemonSpawnState, AppError> {
638 let path = spawn_state_path(models_dir);
639 if !path.exists() {
640 return Ok(DaemonSpawnState::default());
641 }
642
643 let bytes = std::fs::read(path).map_err(AppError::Io)?;
644 serde_json::from_slice(&bytes).map_err(AppError::Json)
645}
646
647fn save_spawn_state(models_dir: &Path, state: &DaemonSpawnState) -> Result<(), AppError> {
648 let path = spawn_state_path(models_dir);
649 std::fs::create_dir_all(crate::paths::parent_or_err(&path)?).map_err(AppError::Io)?;
650 let bytes = serde_json::to_vec(state).map_err(AppError::Json)?;
651 std::fs::write(path, bytes).map_err(AppError::Io)
652}
653
654fn now_epoch_ms() -> u64 {
655 SystemTime::now()
656 .duration_since(UNIX_EPOCH)
657 .unwrap_or_else(|_| Duration::from_secs(0))
658 .as_millis() as u64
659}
660
661fn to_local_socket_name(name: &str) -> std::io::Result<interprocess::local_socket::Name<'static>> {
662 if let Ok(ns_name) = name.to_string().to_ns_name::<GenericNamespaced>() {
663 return Ok(ns_name);
664 }
665
666 let path = if cfg!(unix) {
671 let base = std::env::var_os("XDG_RUNTIME_DIR")
672 .or_else(|| std::env::var_os("SQLITE_GRAPHRAG_HOME"))
673 .map(std::path::PathBuf::from)
674 .unwrap_or_else(std::env::temp_dir);
675 base.join(format!("{name}.sock"))
676 .to_string_lossy()
677 .into_owned()
678 } else {
679 format!(r"\\.\pipe\{name}")
680 };
681 path.to_fs_name::<GenericFilePath>()
682}
683
684#[cfg(test)]
685mod tests {
686 use super::*;
687
688 #[test]
689 fn record_and_clear_spawn_backoff_state() {
690 let tmp = tempfile::tempdir().unwrap();
691 let models_dir = tmp.path().join("cache").join("models");
692 std::fs::create_dir_all(&models_dir).unwrap();
693
694 assert!(!spawn_backoff_active(&models_dir).unwrap());
695
696 record_spawn_failure(&models_dir, "spawn failed".to_string()).unwrap();
697 assert!(spawn_backoff_active(&models_dir).unwrap());
698
699 let state = load_spawn_state(&models_dir).unwrap();
700 assert_eq!(state.consecutive_failures, 1);
701 assert_eq!(state.last_error.as_deref(), Some("spawn failed"));
702
703 clear_spawn_backoff_state(&models_dir).unwrap();
704 assert!(!spawn_backoff_active(&models_dir).unwrap());
705 }
706
707 #[test]
708 fn daemon_control_dir_uses_models_parent() {
709 let base = PathBuf::from("/tmp/sqlite-graphrag-cache-test");
710 let models_dir = base.join("models");
711 assert_eq!(daemon_control_dir(&models_dir), base);
712 }
713}