1use crate::constants::{
7 DAEMON_AUTO_START_INITIAL_BACKOFF_MS, DAEMON_AUTO_START_MAX_BACKOFF_MS,
8 DAEMON_AUTO_START_MAX_WAIT_MS, DAEMON_IDLE_SHUTDOWN_SECS, DAEMON_PING_TIMEOUT_MS,
9 DAEMON_SPAWN_BACKOFF_BASE_MS, DAEMON_SPAWN_LOCK_WAIT_MS, SQLITE_GRAPHRAG_VERSION,
10};
11use crate::errors::AppError;
12use crate::{embedder, shutdown_requested};
13use fs4::fs_std::FileExt;
14use interprocess::local_socket::{
15 prelude::LocalSocketStream,
16 traits::{Listener as _, Stream as _},
17 GenericFilePath, GenericNamespaced, ListenerNonblockingMode, ListenerOptions, ToFsName,
18 ToNsName,
19};
20use serde::{Deserialize, Serialize};
21use std::fs::{File, OpenOptions};
22use std::io::{BufRead, BufReader, Write};
23use std::path::{Path, PathBuf};
24use std::process::Stdio;
25use std::sync::atomic::{AtomicU64, Ordering};
26use std::sync::Arc;
27use std::thread;
28use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
29
30#[derive(Debug, Serialize, Deserialize)]
31#[serde(tag = "request", rename_all = "snake_case")]
32pub enum DaemonRequest {
33 Ping,
34 Shutdown,
35 EmbedPassage {
36 text: String,
37 },
38 EmbedQuery {
39 text: String,
40 },
41 EmbedPassages {
42 texts: Vec<String>,
43 token_counts: Vec<usize>,
44 },
45}
46
47#[derive(Debug, Serialize, Deserialize)]
48#[serde(tag = "status", rename_all = "snake_case")]
49pub enum DaemonResponse {
50 Listening {
51 pid: u32,
52 socket: String,
53 idle_shutdown_secs: u64,
54 },
55 Ok {
56 pid: u32,
57 version: String,
58 handled_embed_requests: u64,
59 },
60 PassageEmbedding {
61 embedding: Vec<f32>,
62 handled_embed_requests: u64,
63 },
64 QueryEmbedding {
65 embedding: Vec<f32>,
66 handled_embed_requests: u64,
67 },
68 PassageEmbeddings {
69 embeddings: Vec<Vec<f32>>,
70 handled_embed_requests: u64,
71 },
72 ShuttingDown {
73 handled_embed_requests: u64,
74 },
75 Error {
76 message: String,
77 },
78}
79
80#[derive(Debug, Default, Serialize, Deserialize)]
81struct DaemonSpawnState {
82 consecutive_failures: u32,
83 not_before_epoch_ms: u64,
84 last_error: Option<String>,
85}
86
87pub fn daemon_label(models_dir: &Path) -> String {
88 let hash = blake3::hash(models_dir.to_string_lossy().as_bytes())
89 .to_hex()
90 .to_string();
91 format!("sqlite-graphrag-daemon-{}", &hash[..16])
92}
93
94pub fn try_ping(models_dir: &Path) -> Result<Option<DaemonResponse>, AppError> {
95 request_if_available(models_dir, &DaemonRequest::Ping)
96}
97
98pub fn try_shutdown(models_dir: &Path) -> Result<Option<DaemonResponse>, AppError> {
99 request_if_available(models_dir, &DaemonRequest::Shutdown)
100}
101
102pub fn embed_passage_or_local(models_dir: &Path, text: &str) -> Result<Vec<f32>, AppError> {
103 match request_or_autostart(
104 models_dir,
105 &DaemonRequest::EmbedPassage {
106 text: text.to_string(),
107 },
108 )? {
109 Some(DaemonResponse::PassageEmbedding { embedding, .. }) => Ok(embedding),
110 Some(DaemonResponse::Error { message }) => Err(AppError::Embedding(message)),
111 Some(other) => Err(AppError::Internal(anyhow::anyhow!(
112 "unexpected daemon response for passage embedding: {other:?}"
113 ))),
114 None => {
115 let embedder = embedder::get_embedder(models_dir)?;
116 embedder::embed_passage(embedder, text)
117 }
118 }
119}
120
121pub fn embed_query_or_local(models_dir: &Path, text: &str) -> Result<Vec<f32>, AppError> {
122 match request_or_autostart(
123 models_dir,
124 &DaemonRequest::EmbedQuery {
125 text: text.to_string(),
126 },
127 )? {
128 Some(DaemonResponse::QueryEmbedding { embedding, .. }) => Ok(embedding),
129 Some(DaemonResponse::Error { message }) => Err(AppError::Embedding(message)),
130 Some(other) => Err(AppError::Internal(anyhow::anyhow!(
131 "unexpected daemon response for query embedding: {other:?}"
132 ))),
133 None => {
134 let embedder = embedder::get_embedder(models_dir)?;
135 embedder::embed_query(embedder, text)
136 }
137 }
138}
139
140pub fn embed_passages_controlled_or_local(
141 models_dir: &Path,
142 texts: &[&str],
143 token_counts: &[usize],
144) -> Result<Vec<Vec<f32>>, AppError> {
145 let request = DaemonRequest::EmbedPassages {
146 texts: texts.iter().map(|t| (*t).to_string()).collect(),
147 token_counts: token_counts.to_vec(),
148 };
149
150 match request_or_autostart(models_dir, &request)? {
151 Some(DaemonResponse::PassageEmbeddings { embeddings, .. }) => Ok(embeddings),
152 Some(DaemonResponse::Error { message }) => Err(AppError::Embedding(message)),
153 Some(other) => Err(AppError::Internal(anyhow::anyhow!(
154 "unexpected daemon response for passage embedding batch: {other:?}"
155 ))),
156 None => {
157 let embedder = embedder::get_embedder(models_dir)?;
158 embedder::embed_passages_controlled(embedder, texts, token_counts)
159 }
160 }
161}
162
163struct DaemonSpawnGuard {
164 models_dir: PathBuf,
165}
166
167impl DaemonSpawnGuard {
168 fn new(models_dir: &Path) -> Self {
169 Self {
170 models_dir: models_dir.to_path_buf(),
171 }
172 }
173}
174
175impl Drop for DaemonSpawnGuard {
176 fn drop(&mut self) {
177 let lock_path = spawn_lock_path(&self.models_dir);
178 if lock_path.exists() {
179 match std::fs::remove_file(&lock_path) {
180 Ok(()) => {
181 tracing::debug!(
182 path = %lock_path.display(),
183 "spawn lock file removed during graceful daemon shutdown"
184 );
185 }
186 Err(err) if err.kind() == std::io::ErrorKind::NotFound => {}
187 Err(err) => {
188 tracing::warn!(
189 error = %err,
190 path = %lock_path.display(),
191 "failed to remove spawn lock file while shutting down daemon"
192 );
193 }
194 }
195 }
196 tracing::info!(
197 "daemon shut down gracefully; socket will be cleaned up by OS or by the next daemon via try_overwrite"
198 );
199 }
200}
201
202pub fn run(models_dir: &Path, idle_shutdown_secs: u64) -> Result<(), AppError> {
203 let rt = tokio::runtime::Builder::new_multi_thread()
207 .worker_threads(2)
208 .thread_name("daemon-worker")
209 .enable_all()
210 .build()
211 .map_err(AppError::Io)?;
212
213 rt.block_on(run_async(models_dir, idle_shutdown_secs))
214}
215
216async fn run_async(models_dir: &Path, idle_shutdown_secs: u64) -> Result<(), AppError> {
217 let socket = daemon_label(models_dir);
218 let name = to_local_socket_name(&socket)?;
219 let listener = ListenerOptions::new()
220 .name(name)
221 .nonblocking(ListenerNonblockingMode::Accept)
222 .try_overwrite(true)
223 .create_sync()
224 .map_err(AppError::Io)?;
225
226 let _spawn_guard = DaemonSpawnGuard::new(models_dir);
229
230 let models_dir_warm = models_dir.to_path_buf();
234 tokio::task::spawn_blocking(move || embedder::get_embedder(&models_dir_warm).map(|_| ()))
235 .await
236 .map_err(|e| AppError::Internal(anyhow::anyhow!("model warm-up panicked: {e}")))??;
237
238 crate::output::emit_json(&DaemonResponse::Listening {
239 pid: std::process::id(),
240 socket,
241 idle_shutdown_secs,
242 })?;
243
244 let handled_embed_requests = Arc::new(AtomicU64::new(0));
245 let mut last_activity = Instant::now();
246 let models_dir = models_dir.to_path_buf();
247
248 loop {
249 if shutdown_requested() {
250 break;
251 }
252
253 if !daemon_control_dir(&models_dir).exists() {
254 tracing::info!("daemon control directory disappeared; shutting down");
255 break;
256 }
257
258 match listener.accept() {
259 Ok(stream) => {
260 last_activity = Instant::now();
261 let models_dir_clone = models_dir.clone();
262 let counter = Arc::clone(&handled_embed_requests);
263 let should_exit = tokio::task::spawn_blocking(move || {
264 handle_client(stream, &models_dir_clone, &counter)
265 })
266 .await
267 .map_err(|e| {
268 AppError::Internal(anyhow::anyhow!("spawn_blocking panicked: {e}"))
269 })??;
270
271 if should_exit {
272 break;
273 }
274 }
275 Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
276 if last_activity.elapsed() >= Duration::from_secs(idle_shutdown_secs) {
277 tracing::info!(
278 idle_shutdown_secs,
279 handled_embed_requests = handled_embed_requests.load(Ordering::Relaxed),
280 "daemon idle timeout reached"
281 );
282 break;
283 }
284 tokio::time::sleep(Duration::from_millis(50)).await;
285 }
286 Err(err) => return Err(AppError::Io(err)),
287 }
288 }
289
290 Ok(())
291}
292
293fn handle_client(
294 stream: LocalSocketStream,
295 models_dir: &Path,
296 handled_embed_requests: &AtomicU64,
297) -> Result<bool, AppError> {
298 let mut reader = BufReader::new(stream);
299 let mut line = String::new();
300 reader.read_line(&mut line).map_err(AppError::Io)?;
301
302 if line.trim().is_empty() {
303 write_response(
304 reader.get_mut(),
305 &DaemonResponse::Error {
306 message: "empty request to daemon".to_string(),
307 },
308 )?;
309 return Ok(false);
310 }
311
312 let request: DaemonRequest = serde_json::from_str(line.trim()).map_err(AppError::Json)?;
313 let (response, should_exit) = match request {
314 DaemonRequest::Ping => (
315 DaemonResponse::Ok {
316 pid: std::process::id(),
317 version: SQLITE_GRAPHRAG_VERSION.to_string(),
318 handled_embed_requests: handled_embed_requests.load(Ordering::Relaxed),
319 },
320 false,
321 ),
322 DaemonRequest::Shutdown => (
323 DaemonResponse::ShuttingDown {
324 handled_embed_requests: handled_embed_requests.load(Ordering::Relaxed),
325 },
326 true,
327 ),
328 DaemonRequest::EmbedPassage { text } => {
329 let embedder = embedder::get_embedder(models_dir)?;
330 let embedding = embedder::embed_passage(embedder, &text)?;
331 let count = handled_embed_requests.fetch_add(1, Ordering::Relaxed) + 1;
332 (
333 DaemonResponse::PassageEmbedding {
334 embedding,
335 handled_embed_requests: count,
336 },
337 false,
338 )
339 }
340 DaemonRequest::EmbedQuery { text } => {
341 let embedder = embedder::get_embedder(models_dir)?;
342 let embedding = embedder::embed_query(embedder, &text)?;
343 let count = handled_embed_requests.fetch_add(1, Ordering::Relaxed) + 1;
344 (
345 DaemonResponse::QueryEmbedding {
346 embedding,
347 handled_embed_requests: count,
348 },
349 false,
350 )
351 }
352 DaemonRequest::EmbedPassages {
353 texts,
354 token_counts,
355 } => {
356 let embedder = embedder::get_embedder(models_dir)?;
357 let text_refs: Vec<&str> = texts.iter().map(String::as_str).collect();
358 let embeddings =
359 embedder::embed_passages_controlled(embedder, &text_refs, &token_counts)?;
360 let count = handled_embed_requests.fetch_add(1, Ordering::Relaxed) + 1;
361 (
362 DaemonResponse::PassageEmbeddings {
363 embeddings,
364 handled_embed_requests: count,
365 },
366 false,
367 )
368 }
369 };
370
371 write_response(reader.get_mut(), &response)?;
372 Ok(should_exit)
373}
374
375fn write_response(
376 stream: &mut LocalSocketStream,
377 response: &DaemonResponse,
378) -> Result<(), AppError> {
379 serde_json::to_writer(&mut *stream, response).map_err(AppError::Json)?;
380 stream.write_all(b"\n").map_err(AppError::Io)?;
381 stream.flush().map_err(AppError::Io)?;
382 Ok(())
383}
384
385fn request_if_available(
386 models_dir: &Path,
387 request: &DaemonRequest,
388) -> Result<Option<DaemonResponse>, AppError> {
389 let socket = daemon_label(models_dir);
390 let name = match to_local_socket_name(&socket) {
391 Ok(name) => name,
392 Err(err) => return Err(AppError::Io(err)),
393 };
394
395 let mut stream = match LocalSocketStream::connect(name) {
396 Ok(stream) => stream,
397 Err(err)
398 if matches!(
399 err.kind(),
400 std::io::ErrorKind::NotFound
401 | std::io::ErrorKind::ConnectionRefused
402 | std::io::ErrorKind::AddrNotAvailable
403 | std::io::ErrorKind::TimedOut
404 ) =>
405 {
406 return Ok(None);
407 }
408 Err(err) => return Err(AppError::Io(err)),
409 };
410
411 serde_json::to_writer(&mut stream, request).map_err(AppError::Json)?;
412 stream.write_all(b"\n").map_err(AppError::Io)?;
413 stream.flush().map_err(AppError::Io)?;
414
415 let mut reader = BufReader::new(stream);
416 let mut line = String::new();
417 reader.read_line(&mut line).map_err(AppError::Io)?;
418 if line.trim().is_empty() {
419 return Err(AppError::Embedding(
420 "daemon returned an empty response".into(),
421 ));
422 }
423
424 let response = serde_json::from_str(line.trim()).map_err(AppError::Json)?;
425 Ok(Some(response))
426}
427
428fn request_or_autostart(
429 models_dir: &Path,
430 request: &DaemonRequest,
431) -> Result<Option<DaemonResponse>, AppError> {
432 if let Some(response) = request_if_available(models_dir, request)? {
433 clear_spawn_backoff_state(models_dir).ok();
434 return Ok(Some(response));
435 }
436
437 if autostart_disabled() {
438 return Ok(None);
439 }
440
441 if !ensure_daemon_running(models_dir)? {
442 return Ok(None);
443 }
444
445 request_if_available(models_dir, request)
446}
447
448fn ensure_daemon_running(models_dir: &Path) -> Result<bool, AppError> {
449 if (try_ping(models_dir)?).is_some() {
450 clear_spawn_backoff_state(models_dir).ok();
451 return Ok(true);
452 }
453
454 if spawn_backoff_active(models_dir)? {
455 tracing::warn!("daemon autostart suppressed by backoff window");
456 return Ok(false);
457 }
458
459 let spawn_lock = match try_acquire_spawn_lock(models_dir)? {
460 Some(lock) => lock,
461 None => return wait_for_daemon_ready(models_dir),
462 };
463
464 if (try_ping(models_dir)?).is_some() {
465 clear_spawn_backoff_state(models_dir).ok();
466 drop(spawn_lock);
467 return Ok(true);
468 }
469
470 let exe = match std::env::current_exe() {
471 Ok(path) => path,
472 Err(err) => {
473 record_spawn_failure(models_dir, format!("current_exe failed: {err}"))?;
474 drop(spawn_lock);
475 return Ok(false);
476 }
477 };
478
479 let mut child = std::process::Command::new(exe);
480 child
481 .arg("daemon")
482 .arg("--idle-shutdown-secs")
483 .arg(DAEMON_IDLE_SHUTDOWN_SECS.to_string())
484 .env("SQLITE_GRAPHRAG_DAEMON_CHILD", "1")
485 .stdin(Stdio::null())
486 .stdout(Stdio::null())
487 .stderr(Stdio::null());
488
489 match child.spawn() {
490 Ok(child_handle) => {
491 let pid = child_handle.id();
501 drop(child_handle);
502 tracing::debug!(
503 pid,
504 "daemon detached; lifecycle managed via spawn lock + readiness file"
505 );
506 let ready = wait_for_daemon_ready(models_dir)?;
507 if ready {
508 clear_spawn_backoff_state(models_dir).ok();
509 } else {
510 record_spawn_failure(
511 models_dir,
512 "daemon did not become healthy after autostart".to_string(),
513 )?;
514 }
515 drop(spawn_lock);
516 Ok(ready)
517 }
518 Err(err) => {
519 record_spawn_failure(models_dir, format!("daemon spawn failed: {err}"))?;
520 drop(spawn_lock);
521 Ok(false)
522 }
523 }
524}
525
526fn wait_for_daemon_ready(models_dir: &Path) -> Result<bool, AppError> {
527 let deadline = Instant::now() + Duration::from_millis(DAEMON_AUTO_START_MAX_WAIT_MS);
528 let mut sleep_ms = DAEMON_AUTO_START_INITIAL_BACKOFF_MS.max(DAEMON_PING_TIMEOUT_MS);
529
530 while Instant::now() < deadline {
531 if (try_ping(models_dir)?).is_some() {
532 return Ok(true);
533 }
534 thread::sleep(Duration::from_millis(sleep_ms));
535 sleep_ms = (sleep_ms * 2).min(DAEMON_AUTO_START_MAX_BACKOFF_MS);
536 }
537
538 Ok(false)
539}
540
541fn autostart_disabled() -> bool {
542 std::env::var("SQLITE_GRAPHRAG_DAEMON_CHILD").as_deref() == Ok("1")
543 || std::env::var("SQLITE_GRAPHRAG_DAEMON_FORCE_AUTOSTART").as_deref() != Ok("1")
544 && std::env::var("SQLITE_GRAPHRAG_DAEMON_DISABLE_AUTOSTART").as_deref() == Ok("1")
545}
546
547fn daemon_control_dir(models_dir: &Path) -> PathBuf {
548 models_dir
549 .parent()
550 .map(Path::to_path_buf)
551 .unwrap_or_else(|| models_dir.to_path_buf())
552}
553
554fn spawn_lock_path(models_dir: &Path) -> PathBuf {
555 daemon_control_dir(models_dir).join("daemon-spawn.lock")
556}
557
558fn spawn_state_path(models_dir: &Path) -> PathBuf {
559 daemon_control_dir(models_dir).join("daemon-spawn-state.json")
560}
561
562fn try_acquire_spawn_lock(models_dir: &Path) -> Result<Option<File>, AppError> {
563 let path = spawn_lock_path(models_dir);
564 std::fs::create_dir_all(crate::paths::parent_or_err(&path)?).map_err(AppError::Io)?;
565 let file = OpenOptions::new()
566 .read(true)
567 .write(true)
568 .create(true)
569 .truncate(false)
570 .open(path)
571 .map_err(AppError::Io)?;
572
573 let deadline = Instant::now() + Duration::from_millis(DAEMON_SPAWN_LOCK_WAIT_MS);
574 loop {
575 match file.try_lock_exclusive() {
576 Ok(()) => return Ok(Some(file)),
577 Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
578 if Instant::now() >= deadline {
579 return Ok(None);
580 }
581 thread::sleep(Duration::from_millis(50));
582 }
583 Err(err) => return Err(AppError::Io(err)),
584 }
585 }
586}
587
588fn spawn_backoff_active(models_dir: &Path) -> Result<bool, AppError> {
589 let state = load_spawn_state(models_dir)?;
590 Ok(now_epoch_ms() < state.not_before_epoch_ms)
591}
592
593fn record_spawn_failure(models_dir: &Path, message: String) -> Result<(), AppError> {
594 let mut state = load_spawn_state(models_dir)?;
595 state.consecutive_failures = state.consecutive_failures.saturating_add(1);
596 let exponent = state.consecutive_failures.saturating_sub(1).min(6);
597 let base_ms =
598 (DAEMON_SPAWN_BACKOFF_BASE_MS * (1_u64 << exponent)).min(DAEMON_AUTO_START_MAX_BACKOFF_MS);
599 let half = base_ms / 2;
604 let jitter_seed = SystemTime::now()
605 .duration_since(UNIX_EPOCH)
606 .map(|d| d.subsec_nanos() as u64)
607 .unwrap_or(0);
608 let jitter = if half == 0 { 0 } else { jitter_seed % half };
609 let backoff_ms = half + jitter;
610 state.not_before_epoch_ms = now_epoch_ms() + backoff_ms;
611 state.last_error = Some(message);
612 save_spawn_state(models_dir, &state)
613}
614
615fn clear_spawn_backoff_state(models_dir: &Path) -> Result<(), AppError> {
616 let path = spawn_state_path(models_dir);
617 if path.exists() {
618 std::fs::remove_file(path).map_err(AppError::Io)?;
619 }
620 Ok(())
621}
622
623fn load_spawn_state(models_dir: &Path) -> Result<DaemonSpawnState, AppError> {
624 let path = spawn_state_path(models_dir);
625 if !path.exists() {
626 return Ok(DaemonSpawnState::default());
627 }
628
629 let bytes = std::fs::read(path).map_err(AppError::Io)?;
630 serde_json::from_slice(&bytes).map_err(AppError::Json)
631}
632
633fn save_spawn_state(models_dir: &Path, state: &DaemonSpawnState) -> Result<(), AppError> {
634 let path = spawn_state_path(models_dir);
635 std::fs::create_dir_all(crate::paths::parent_or_err(&path)?).map_err(AppError::Io)?;
636 let bytes = serde_json::to_vec(state).map_err(AppError::Json)?;
637 std::fs::write(path, bytes).map_err(AppError::Io)
638}
639
640fn now_epoch_ms() -> u64 {
641 SystemTime::now()
642 .duration_since(UNIX_EPOCH)
643 .unwrap_or_else(|_| Duration::from_secs(0))
644 .as_millis() as u64
645}
646
647fn to_local_socket_name(name: &str) -> std::io::Result<interprocess::local_socket::Name<'static>> {
648 if let Ok(ns_name) = name.to_string().to_ns_name::<GenericNamespaced>() {
649 return Ok(ns_name);
650 }
651
652 let path = if cfg!(unix) {
657 let base = std::env::var_os("XDG_RUNTIME_DIR")
658 .or_else(|| std::env::var_os("SQLITE_GRAPHRAG_HOME"))
659 .map(std::path::PathBuf::from)
660 .unwrap_or_else(|| std::path::PathBuf::from("/tmp"));
661 base.join(format!("{name}.sock"))
662 .to_string_lossy()
663 .into_owned()
664 } else {
665 format!(r"\\.\pipe\{name}")
666 };
667 path.to_fs_name::<GenericFilePath>()
668}
669
670#[cfg(test)]
671mod tests {
672 use super::*;
673
674 #[test]
675 fn record_and_clear_spawn_backoff_state() {
676 let tmp = tempfile::tempdir().unwrap();
677 let models_dir = tmp.path().join("cache").join("models");
678 std::fs::create_dir_all(&models_dir).unwrap();
679
680 assert!(!spawn_backoff_active(&models_dir).unwrap());
681
682 record_spawn_failure(&models_dir, "spawn failed".to_string()).unwrap();
683 assert!(spawn_backoff_active(&models_dir).unwrap());
684
685 let state = load_spawn_state(&models_dir).unwrap();
686 assert_eq!(state.consecutive_failures, 1);
687 assert_eq!(state.last_error.as_deref(), Some("spawn failed"));
688
689 clear_spawn_backoff_state(&models_dir).unwrap();
690 assert!(!spawn_backoff_active(&models_dir).unwrap());
691 }
692
693 #[test]
694 fn daemon_control_dir_uses_models_parent() {
695 let base = PathBuf::from("/tmp/sqlite-graphrag-cache-test");
696 let models_dir = base.join("models");
697 assert_eq!(daemon_control_dir(&models_dir), base);
698 }
699}