1use crate::constants::{
2 DAEMON_AUTO_START_INITIAL_BACKOFF_MS, DAEMON_AUTO_START_MAX_BACKOFF_MS,
3 DAEMON_AUTO_START_MAX_WAIT_MS, DAEMON_IDLE_SHUTDOWN_SECS, DAEMON_PING_TIMEOUT_MS,
4 DAEMON_SPAWN_BACKOFF_BASE_MS, DAEMON_SPAWN_LOCK_WAIT_MS, SQLITE_GRAPHRAG_VERSION,
5};
6use crate::errors::AppError;
7use crate::{embedder, shutdown_requested};
8use fs4::fs_std::FileExt;
9use interprocess::local_socket::{
10 prelude::LocalSocketStream,
11 traits::{Listener as _, Stream as _},
12 GenericFilePath, GenericNamespaced, ListenerNonblockingMode, ListenerOptions, ToFsName,
13 ToNsName,
14};
15use serde::{Deserialize, Serialize};
16use std::fs::{File, OpenOptions};
17use std::io::{BufRead, BufReader, Write};
18use std::path::{Path, PathBuf};
19use std::process::Stdio;
20use std::thread;
21use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
22
23#[derive(Debug, Serialize, Deserialize)]
24#[serde(tag = "request", rename_all = "snake_case")]
25pub enum DaemonRequest {
26 Ping,
27 Shutdown,
28 EmbedPassage {
29 text: String,
30 },
31 EmbedQuery {
32 text: String,
33 },
34 EmbedPassages {
35 texts: Vec<String>,
36 token_counts: Vec<usize>,
37 },
38}
39
40#[derive(Debug, Serialize, Deserialize)]
41#[serde(tag = "status", rename_all = "snake_case")]
42pub enum DaemonResponse {
43 Listening {
44 pid: u32,
45 socket: String,
46 idle_shutdown_secs: u64,
47 },
48 Ok {
49 pid: u32,
50 version: String,
51 handled_embed_requests: u64,
52 },
53 PassageEmbedding {
54 embedding: Vec<f32>,
55 handled_embed_requests: u64,
56 },
57 QueryEmbedding {
58 embedding: Vec<f32>,
59 handled_embed_requests: u64,
60 },
61 PassageEmbeddings {
62 embeddings: Vec<Vec<f32>>,
63 handled_embed_requests: u64,
64 },
65 ShuttingDown {
66 handled_embed_requests: u64,
67 },
68 Error {
69 message: String,
70 },
71}
72
73#[derive(Debug, Default, Serialize, Deserialize)]
74struct DaemonSpawnState {
75 consecutive_failures: u32,
76 not_before_epoch_ms: u64,
77 last_error: Option<String>,
78}
79
80pub fn daemon_label(models_dir: &Path) -> String {
81 let hash = blake3::hash(models_dir.to_string_lossy().as_bytes())
82 .to_hex()
83 .to_string();
84 format!("sqlite-graphrag-daemon-{}", &hash[..16])
85}
86
87pub fn try_ping(models_dir: &Path) -> Result<Option<DaemonResponse>, AppError> {
88 request_if_available(models_dir, &DaemonRequest::Ping)
89}
90
91pub fn try_shutdown(models_dir: &Path) -> Result<Option<DaemonResponse>, AppError> {
92 request_if_available(models_dir, &DaemonRequest::Shutdown)
93}
94
95pub fn embed_passage_or_local(models_dir: &Path, text: &str) -> Result<Vec<f32>, AppError> {
96 match request_or_autostart(
97 models_dir,
98 &DaemonRequest::EmbedPassage {
99 text: text.to_string(),
100 },
101 )? {
102 Some(DaemonResponse::PassageEmbedding { embedding, .. }) => Ok(embedding),
103 Some(DaemonResponse::Error { message }) => Err(AppError::Embedding(message)),
104 Some(other) => Err(AppError::Internal(anyhow::anyhow!(
105 "unexpected daemon response for passage embedding: {other:?}"
106 ))),
107 None => {
108 let embedder = embedder::get_embedder(models_dir)?;
109 embedder::embed_passage(embedder, text)
110 }
111 }
112}
113
114pub fn embed_query_or_local(models_dir: &Path, text: &str) -> Result<Vec<f32>, AppError> {
115 match request_or_autostart(
116 models_dir,
117 &DaemonRequest::EmbedQuery {
118 text: text.to_string(),
119 },
120 )? {
121 Some(DaemonResponse::QueryEmbedding { embedding, .. }) => Ok(embedding),
122 Some(DaemonResponse::Error { message }) => Err(AppError::Embedding(message)),
123 Some(other) => Err(AppError::Internal(anyhow::anyhow!(
124 "unexpected daemon response for query embedding: {other:?}"
125 ))),
126 None => {
127 let embedder = embedder::get_embedder(models_dir)?;
128 embedder::embed_query(embedder, text)
129 }
130 }
131}
132
133pub fn embed_passages_controlled_or_local(
134 models_dir: &Path,
135 texts: &[&str],
136 token_counts: &[usize],
137) -> Result<Vec<Vec<f32>>, AppError> {
138 let request = DaemonRequest::EmbedPassages {
139 texts: texts.iter().map(|t| (*t).to_string()).collect(),
140 token_counts: token_counts.to_vec(),
141 };
142
143 match request_or_autostart(models_dir, &request)? {
144 Some(DaemonResponse::PassageEmbeddings { embeddings, .. }) => Ok(embeddings),
145 Some(DaemonResponse::Error { message }) => Err(AppError::Embedding(message)),
146 Some(other) => Err(AppError::Internal(anyhow::anyhow!(
147 "unexpected daemon response for batch passage embeddings: {other:?}"
148 ))),
149 None => {
150 let embedder = embedder::get_embedder(models_dir)?;
151 embedder::embed_passages_controlled(embedder, texts, token_counts)
152 }
153 }
154}
155
156pub fn run(models_dir: &Path, idle_shutdown_secs: u64) -> Result<(), AppError> {
157 let socket = daemon_label(models_dir);
158 let name = to_local_socket_name(&socket)?;
159 let listener = ListenerOptions::new()
160 .name(name)
161 .nonblocking(ListenerNonblockingMode::Accept)
162 .try_overwrite(true)
163 .create_sync()
164 .map_err(AppError::Io)?;
165
166 let _ = embedder::get_embedder(models_dir)?;
168
169 crate::output::emit_json(&DaemonResponse::Listening {
170 pid: std::process::id(),
171 socket,
172 idle_shutdown_secs,
173 })?;
174
175 let mut handled_embed_requests = 0_u64;
176 let mut last_activity = Instant::now();
177
178 loop {
179 if shutdown_requested() {
180 break;
181 }
182
183 if !daemon_control_dir(models_dir).exists() {
184 tracing::info!("daemon control directory disappeared; shutting down");
185 break;
186 }
187
188 match listener.accept() {
189 Ok(stream) => {
190 last_activity = Instant::now();
191 let should_exit = handle_client(stream, models_dir, &mut handled_embed_requests)?;
192 if should_exit {
193 break;
194 }
195 }
196 Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
197 if last_activity.elapsed() >= Duration::from_secs(idle_shutdown_secs) {
198 tracing::info!(
199 idle_shutdown_secs,
200 handled_embed_requests,
201 "daemon idle timeout reached"
202 );
203 break;
204 }
205 thread::sleep(Duration::from_millis(50));
206 }
207 Err(err) => return Err(AppError::Io(err)),
208 }
209 }
210
211 Ok(())
212}
213
214fn handle_client(
215 stream: LocalSocketStream,
216 models_dir: &Path,
217 handled_embed_requests: &mut u64,
218) -> Result<bool, AppError> {
219 let mut reader = BufReader::new(stream);
220 let mut line = String::new();
221 reader.read_line(&mut line).map_err(AppError::Io)?;
222
223 if line.trim().is_empty() {
224 write_response(
225 reader.get_mut(),
226 &DaemonResponse::Error {
227 message: "empty daemon request".to_string(),
228 },
229 )?;
230 return Ok(false);
231 }
232
233 let request: DaemonRequest = serde_json::from_str(line.trim()).map_err(AppError::Json)?;
234 let (response, should_exit) = match request {
235 DaemonRequest::Ping => (
236 DaemonResponse::Ok {
237 pid: std::process::id(),
238 version: SQLITE_GRAPHRAG_VERSION.to_string(),
239 handled_embed_requests: *handled_embed_requests,
240 },
241 false,
242 ),
243 DaemonRequest::Shutdown => (
244 DaemonResponse::ShuttingDown {
245 handled_embed_requests: *handled_embed_requests,
246 },
247 true,
248 ),
249 DaemonRequest::EmbedPassage { text } => {
250 let embedder = embedder::get_embedder(models_dir)?;
251 let embedding = embedder::embed_passage(embedder, &text)?;
252 *handled_embed_requests += 1;
253 (
254 DaemonResponse::PassageEmbedding {
255 embedding,
256 handled_embed_requests: *handled_embed_requests,
257 },
258 false,
259 )
260 }
261 DaemonRequest::EmbedQuery { text } => {
262 let embedder = embedder::get_embedder(models_dir)?;
263 let embedding = embedder::embed_query(embedder, &text)?;
264 *handled_embed_requests += 1;
265 (
266 DaemonResponse::QueryEmbedding {
267 embedding,
268 handled_embed_requests: *handled_embed_requests,
269 },
270 false,
271 )
272 }
273 DaemonRequest::EmbedPassages {
274 texts,
275 token_counts,
276 } => {
277 let embedder = embedder::get_embedder(models_dir)?;
278 let text_refs: Vec<&str> = texts.iter().map(String::as_str).collect();
279 let embeddings =
280 embedder::embed_passages_controlled(embedder, &text_refs, &token_counts)?;
281 *handled_embed_requests += 1;
282 (
283 DaemonResponse::PassageEmbeddings {
284 embeddings,
285 handled_embed_requests: *handled_embed_requests,
286 },
287 false,
288 )
289 }
290 };
291
292 write_response(reader.get_mut(), &response)?;
293 Ok(should_exit)
294}
295
296fn write_response(
297 stream: &mut LocalSocketStream,
298 response: &DaemonResponse,
299) -> Result<(), AppError> {
300 serde_json::to_writer(&mut *stream, response).map_err(AppError::Json)?;
301 stream.write_all(b"\n").map_err(AppError::Io)?;
302 stream.flush().map_err(AppError::Io)?;
303 Ok(())
304}
305
306fn request_if_available(
307 models_dir: &Path,
308 request: &DaemonRequest,
309) -> Result<Option<DaemonResponse>, AppError> {
310 let socket = daemon_label(models_dir);
311 let name = match to_local_socket_name(&socket) {
312 Ok(name) => name,
313 Err(err) => return Err(AppError::Io(err)),
314 };
315
316 let mut stream = match LocalSocketStream::connect(name) {
317 Ok(stream) => stream,
318 Err(err)
319 if matches!(
320 err.kind(),
321 std::io::ErrorKind::NotFound
322 | std::io::ErrorKind::ConnectionRefused
323 | std::io::ErrorKind::AddrNotAvailable
324 | std::io::ErrorKind::TimedOut
325 ) =>
326 {
327 return Ok(None);
328 }
329 Err(err) => return Err(AppError::Io(err)),
330 };
331
332 serde_json::to_writer(&mut stream, request).map_err(AppError::Json)?;
333 stream.write_all(b"\n").map_err(AppError::Io)?;
334 stream.flush().map_err(AppError::Io)?;
335
336 let mut reader = BufReader::new(stream);
337 let mut line = String::new();
338 reader.read_line(&mut line).map_err(AppError::Io)?;
339 if line.trim().is_empty() {
340 return Err(AppError::Embedding("daemon returned empty response".into()));
341 }
342
343 let response = serde_json::from_str(line.trim()).map_err(AppError::Json)?;
344 Ok(Some(response))
345}
346
347fn request_or_autostart(
348 models_dir: &Path,
349 request: &DaemonRequest,
350) -> Result<Option<DaemonResponse>, AppError> {
351 if let Some(response) = request_if_available(models_dir, request)? {
352 clear_spawn_backoff_state(models_dir).ok();
353 return Ok(Some(response));
354 }
355
356 if autostart_disabled() {
357 return Ok(None);
358 }
359
360 if !ensure_daemon_running(models_dir)? {
361 return Ok(None);
362 }
363
364 request_if_available(models_dir, request)
365}
366
367fn ensure_daemon_running(models_dir: &Path) -> Result<bool, AppError> {
368 if (try_ping(models_dir)?).is_some() {
369 clear_spawn_backoff_state(models_dir).ok();
370 return Ok(true);
371 }
372
373 if spawn_backoff_active(models_dir)? {
374 tracing::warn!("daemon autostart suppressed by backoff window");
375 return Ok(false);
376 }
377
378 let spawn_lock = match try_acquire_spawn_lock(models_dir)? {
379 Some(lock) => lock,
380 None => return wait_for_daemon_ready(models_dir),
381 };
382
383 if (try_ping(models_dir)?).is_some() {
384 clear_spawn_backoff_state(models_dir).ok();
385 drop(spawn_lock);
386 return Ok(true);
387 }
388
389 let exe = match std::env::current_exe() {
390 Ok(path) => path,
391 Err(err) => {
392 record_spawn_failure(models_dir, format!("current_exe failed: {err}"))?;
393 drop(spawn_lock);
394 return Ok(false);
395 }
396 };
397
398 let mut child = std::process::Command::new(exe);
399 child
400 .arg("daemon")
401 .arg("--idle-shutdown-secs")
402 .arg(DAEMON_IDLE_SHUTDOWN_SECS.to_string())
403 .env("SQLITE_GRAPHRAG_DAEMON_CHILD", "1")
404 .stdin(Stdio::null())
405 .stdout(Stdio::null())
406 .stderr(Stdio::null());
407
408 match child.spawn() {
409 Ok(_) => {
410 let ready = wait_for_daemon_ready(models_dir)?;
411 if ready {
412 clear_spawn_backoff_state(models_dir).ok();
413 } else {
414 record_spawn_failure(
415 models_dir,
416 "daemon did not become healthy after autostart".to_string(),
417 )?;
418 }
419 drop(spawn_lock);
420 Ok(ready)
421 }
422 Err(err) => {
423 record_spawn_failure(models_dir, format!("daemon spawn failed: {err}"))?;
424 drop(spawn_lock);
425 Ok(false)
426 }
427 }
428}
429
430fn wait_for_daemon_ready(models_dir: &Path) -> Result<bool, AppError> {
431 let deadline = Instant::now() + Duration::from_millis(DAEMON_AUTO_START_MAX_WAIT_MS);
432 let mut sleep_ms = DAEMON_AUTO_START_INITIAL_BACKOFF_MS.max(DAEMON_PING_TIMEOUT_MS);
433
434 while Instant::now() < deadline {
435 if (try_ping(models_dir)?).is_some() {
436 return Ok(true);
437 }
438 thread::sleep(Duration::from_millis(sleep_ms));
439 sleep_ms = (sleep_ms * 2).min(DAEMON_AUTO_START_MAX_BACKOFF_MS);
440 }
441
442 Ok(false)
443}
444
445fn autostart_disabled() -> bool {
446 std::env::var("SQLITE_GRAPHRAG_DAEMON_CHILD").as_deref() == Ok("1")
447 || std::env::var("SQLITE_GRAPHRAG_DAEMON_FORCE_AUTOSTART").as_deref() != Ok("1")
448 && std::env::var("SQLITE_GRAPHRAG_DAEMON_DISABLE_AUTOSTART").as_deref() == Ok("1")
449}
450
451fn daemon_control_dir(models_dir: &Path) -> PathBuf {
452 models_dir
453 .parent()
454 .map(Path::to_path_buf)
455 .unwrap_or_else(|| models_dir.to_path_buf())
456}
457
458fn spawn_lock_path(models_dir: &Path) -> PathBuf {
459 daemon_control_dir(models_dir).join("daemon-spawn.lock")
460}
461
462fn spawn_state_path(models_dir: &Path) -> PathBuf {
463 daemon_control_dir(models_dir).join("daemon-spawn-state.json")
464}
465
466fn try_acquire_spawn_lock(models_dir: &Path) -> Result<Option<File>, AppError> {
467 let path = spawn_lock_path(models_dir);
468 std::fs::create_dir_all(path.parent().unwrap()).map_err(AppError::Io)?;
469 let file = OpenOptions::new()
470 .read(true)
471 .write(true)
472 .create(true)
473 .truncate(false)
474 .open(path)
475 .map_err(AppError::Io)?;
476
477 let deadline = Instant::now() + Duration::from_millis(DAEMON_SPAWN_LOCK_WAIT_MS);
478 loop {
479 match file.try_lock_exclusive() {
480 Ok(()) => return Ok(Some(file)),
481 Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
482 if Instant::now() >= deadline {
483 return Ok(None);
484 }
485 thread::sleep(Duration::from_millis(50));
486 }
487 Err(err) => return Err(AppError::Io(err)),
488 }
489 }
490}
491
492fn spawn_backoff_active(models_dir: &Path) -> Result<bool, AppError> {
493 let state = load_spawn_state(models_dir)?;
494 Ok(now_epoch_ms() < state.not_before_epoch_ms)
495}
496
497fn record_spawn_failure(models_dir: &Path, message: String) -> Result<(), AppError> {
498 let mut state = load_spawn_state(models_dir)?;
499 state.consecutive_failures = state.consecutive_failures.saturating_add(1);
500 let exponent = state.consecutive_failures.saturating_sub(1).min(6);
501 let backoff_ms =
502 (DAEMON_SPAWN_BACKOFF_BASE_MS * (1_u64 << exponent)).min(DAEMON_AUTO_START_MAX_BACKOFF_MS);
503 state.not_before_epoch_ms = now_epoch_ms() + backoff_ms;
504 state.last_error = Some(message);
505 save_spawn_state(models_dir, &state)
506}
507
508fn clear_spawn_backoff_state(models_dir: &Path) -> Result<(), AppError> {
509 let path = spawn_state_path(models_dir);
510 if path.exists() {
511 std::fs::remove_file(path).map_err(AppError::Io)?;
512 }
513 Ok(())
514}
515
516fn load_spawn_state(models_dir: &Path) -> Result<DaemonSpawnState, AppError> {
517 let path = spawn_state_path(models_dir);
518 if !path.exists() {
519 return Ok(DaemonSpawnState::default());
520 }
521
522 let bytes = std::fs::read(path).map_err(AppError::Io)?;
523 serde_json::from_slice(&bytes).map_err(AppError::Json)
524}
525
526fn save_spawn_state(models_dir: &Path, state: &DaemonSpawnState) -> Result<(), AppError> {
527 let path = spawn_state_path(models_dir);
528 std::fs::create_dir_all(path.parent().unwrap()).map_err(AppError::Io)?;
529 let bytes = serde_json::to_vec(state).map_err(AppError::Json)?;
530 std::fs::write(path, bytes).map_err(AppError::Io)
531}
532
533fn now_epoch_ms() -> u64 {
534 SystemTime::now()
535 .duration_since(UNIX_EPOCH)
536 .unwrap_or_else(|_| Duration::from_secs(0))
537 .as_millis() as u64
538}
539
540fn to_local_socket_name(name: &str) -> std::io::Result<interprocess::local_socket::Name<'static>> {
541 if let Ok(ns_name) = name.to_string().to_ns_name::<GenericNamespaced>() {
542 return Ok(ns_name);
543 }
544
545 let path = if cfg!(unix) {
546 format!("/tmp/{name}.sock")
547 } else {
548 format!(r"\\.\pipe\{name}")
549 };
550 path.to_fs_name::<GenericFilePath>()
551}
552
553#[cfg(test)]
554mod tests {
555 use super::*;
556
557 #[test]
558 fn record_and_clear_spawn_backoff_state() {
559 let tmp = tempfile::tempdir().unwrap();
560 let models_dir = tmp.path().join("cache").join("models");
561 std::fs::create_dir_all(&models_dir).unwrap();
562
563 assert!(!spawn_backoff_active(&models_dir).unwrap());
564
565 record_spawn_failure(&models_dir, "spawn failed".to_string()).unwrap();
566 assert!(spawn_backoff_active(&models_dir).unwrap());
567
568 let state = load_spawn_state(&models_dir).unwrap();
569 assert_eq!(state.consecutive_failures, 1);
570 assert_eq!(state.last_error.as_deref(), Some("spawn failed"));
571
572 clear_spawn_backoff_state(&models_dir).unwrap();
573 assert!(!spawn_backoff_active(&models_dir).unwrap());
574 }
575
576 #[test]
577 fn daemon_control_dir_usa_pai_de_models() {
578 let base = PathBuf::from("/tmp/sqlite-graphrag-cache-test");
579 let models_dir = base.join("models");
580 assert_eq!(daemon_control_dir(&models_dir), base);
581 }
582}