1use crate::constants::{
2 DAEMON_AUTO_START_INITIAL_BACKOFF_MS, DAEMON_AUTO_START_MAX_BACKOFF_MS,
3 DAEMON_AUTO_START_MAX_WAIT_MS, DAEMON_IDLE_SHUTDOWN_SECS, DAEMON_PING_TIMEOUT_MS,
4 DAEMON_SPAWN_BACKOFF_BASE_MS, DAEMON_SPAWN_LOCK_WAIT_MS, SQLITE_GRAPHRAG_VERSION,
5};
6use crate::errors::AppError;
7use crate::{embedder, shutdown_requested};
8use fs4::fs_std::FileExt;
9use interprocess::local_socket::{
10 prelude::LocalSocketStream,
11 traits::{Listener as _, Stream as _},
12 GenericFilePath, GenericNamespaced, ListenerNonblockingMode, ListenerOptions, ToFsName,
13 ToNsName,
14};
15use serde::{Deserialize, Serialize};
16use std::fs::{File, OpenOptions};
17use std::io::{BufRead, BufReader, Write};
18use std::path::{Path, PathBuf};
19use std::process::Stdio;
20use std::thread;
21use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
22
23#[derive(Debug, Serialize, Deserialize)]
24#[serde(tag = "request", rename_all = "snake_case")]
25pub enum DaemonRequest {
26 Ping,
27 Shutdown,
28 EmbedPassage {
29 text: String,
30 },
31 EmbedQuery {
32 text: String,
33 },
34 EmbedPassages {
35 texts: Vec<String>,
36 token_counts: Vec<usize>,
37 },
38}
39
40#[derive(Debug, Serialize, Deserialize)]
41#[serde(tag = "status", rename_all = "snake_case")]
42pub enum DaemonResponse {
43 Listening {
44 pid: u32,
45 socket: String,
46 idle_shutdown_secs: u64,
47 },
48 Ok {
49 pid: u32,
50 version: String,
51 handled_embed_requests: u64,
52 },
53 PassageEmbedding {
54 embedding: Vec<f32>,
55 handled_embed_requests: u64,
56 },
57 QueryEmbedding {
58 embedding: Vec<f32>,
59 handled_embed_requests: u64,
60 },
61 PassageEmbeddings {
62 embeddings: Vec<Vec<f32>>,
63 handled_embed_requests: u64,
64 },
65 ShuttingDown {
66 handled_embed_requests: u64,
67 },
68 Error {
69 message: String,
70 },
71}
72
73#[derive(Debug, Default, Serialize, Deserialize)]
74struct DaemonSpawnState {
75 consecutive_failures: u32,
76 not_before_epoch_ms: u64,
77 last_error: Option<String>,
78}
79
80pub fn daemon_label(models_dir: &Path) -> String {
81 let hash = blake3::hash(models_dir.to_string_lossy().as_bytes())
82 .to_hex()
83 .to_string();
84 format!("sqlite-graphrag-daemon-{}", &hash[..16])
85}
86
87pub fn try_ping(models_dir: &Path) -> Result<Option<DaemonResponse>, AppError> {
88 request_if_available(models_dir, &DaemonRequest::Ping)
89}
90
91pub fn try_shutdown(models_dir: &Path) -> Result<Option<DaemonResponse>, AppError> {
92 request_if_available(models_dir, &DaemonRequest::Shutdown)
93}
94
95pub fn embed_passage_or_local(models_dir: &Path, text: &str) -> Result<Vec<f32>, AppError> {
96 match request_or_autostart(
97 models_dir,
98 &DaemonRequest::EmbedPassage {
99 text: text.to_string(),
100 },
101 )? {
102 Some(DaemonResponse::PassageEmbedding { embedding, .. }) => Ok(embedding),
103 Some(DaemonResponse::Error { message }) => Err(AppError::Embedding(message)),
104 Some(other) => Err(AppError::Internal(anyhow::anyhow!(
105 "unexpected daemon response for passage embedding: {other:?}"
106 ))),
107 None => {
108 let embedder = embedder::get_embedder(models_dir)?;
109 embedder::embed_passage(embedder, text)
110 }
111 }
112}
113
114pub fn embed_query_or_local(models_dir: &Path, text: &str) -> Result<Vec<f32>, AppError> {
115 match request_or_autostart(
116 models_dir,
117 &DaemonRequest::EmbedQuery {
118 text: text.to_string(),
119 },
120 )? {
121 Some(DaemonResponse::QueryEmbedding { embedding, .. }) => Ok(embedding),
122 Some(DaemonResponse::Error { message }) => Err(AppError::Embedding(message)),
123 Some(other) => Err(AppError::Internal(anyhow::anyhow!(
124 "unexpected daemon response for query embedding: {other:?}"
125 ))),
126 None => {
127 let embedder = embedder::get_embedder(models_dir)?;
128 embedder::embed_query(embedder, text)
129 }
130 }
131}
132
133pub fn embed_passages_controlled_or_local(
134 models_dir: &Path,
135 texts: &[&str],
136 token_counts: &[usize],
137) -> Result<Vec<Vec<f32>>, AppError> {
138 let request = DaemonRequest::EmbedPassages {
139 texts: texts.iter().map(|t| (*t).to_string()).collect(),
140 token_counts: token_counts.to_vec(),
141 };
142
143 match request_or_autostart(models_dir, &request)? {
144 Some(DaemonResponse::PassageEmbeddings { embeddings, .. }) => Ok(embeddings),
145 Some(DaemonResponse::Error { message }) => Err(AppError::Embedding(message)),
146 Some(other) => Err(AppError::Internal(anyhow::anyhow!(
147 "unexpected daemon response for batch passage embeddings: {other:?}"
148 ))),
149 None => {
150 let embedder = embedder::get_embedder(models_dir)?;
151 embedder::embed_passages_controlled(embedder, texts, token_counts)
152 }
153 }
154}
155
156struct DaemonSpawnGuard {
157 models_dir: PathBuf,
158}
159
160impl DaemonSpawnGuard {
161 fn new(models_dir: &Path) -> Self {
162 Self {
163 models_dir: models_dir.to_path_buf(),
164 }
165 }
166}
167
168impl Drop for DaemonSpawnGuard {
169 fn drop(&mut self) {
170 let lock_path = spawn_lock_path(&self.models_dir);
171 if lock_path.exists() {
172 match std::fs::remove_file(&lock_path) {
173 Ok(()) => {
174 tracing::debug!(
175 path = %lock_path.display(),
176 "lock file de spawn removido ao encerrar daemon graciosamente"
177 );
178 }
179 Err(err) if err.kind() == std::io::ErrorKind::NotFound => {}
180 Err(err) => {
181 tracing::warn!(
182 error = %err,
183 path = %lock_path.display(),
184 "falha ao remover lock file de spawn ao encerrar daemon"
185 );
186 }
187 }
188 }
189 tracing::info!(
190 "daemon encerrado graciosamente; socket será limpo pelo OS ou pelo próximo daemon via try_overwrite"
191 );
192 }
193}
194
195pub fn run(models_dir: &Path, idle_shutdown_secs: u64) -> Result<(), AppError> {
196 let socket = daemon_label(models_dir);
197 let name = to_local_socket_name(&socket)?;
198 let listener = ListenerOptions::new()
199 .name(name)
200 .nonblocking(ListenerNonblockingMode::Accept)
201 .try_overwrite(true)
202 .create_sync()
203 .map_err(AppError::Io)?;
204
205 let _spawn_guard = DaemonSpawnGuard::new(models_dir);
208
209 let _ = embedder::get_embedder(models_dir)?;
211
212 crate::output::emit_json(&DaemonResponse::Listening {
213 pid: std::process::id(),
214 socket,
215 idle_shutdown_secs,
216 })?;
217
218 let mut handled_embed_requests = 0_u64;
219 let mut last_activity = Instant::now();
220
221 loop {
222 if shutdown_requested() {
223 break;
224 }
225
226 if !daemon_control_dir(models_dir).exists() {
227 tracing::info!("daemon control directory disappeared; shutting down");
228 break;
229 }
230
231 match listener.accept() {
232 Ok(stream) => {
233 last_activity = Instant::now();
234 let should_exit = handle_client(stream, models_dir, &mut handled_embed_requests)?;
235 if should_exit {
236 break;
237 }
238 }
239 Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
240 if last_activity.elapsed() >= Duration::from_secs(idle_shutdown_secs) {
241 tracing::info!(
242 idle_shutdown_secs,
243 handled_embed_requests,
244 "daemon idle timeout reached"
245 );
246 break;
247 }
248 thread::sleep(Duration::from_millis(50));
249 }
250 Err(err) => return Err(AppError::Io(err)),
251 }
252 }
253
254 Ok(())
255}
256
257fn handle_client(
258 stream: LocalSocketStream,
259 models_dir: &Path,
260 handled_embed_requests: &mut u64,
261) -> Result<bool, AppError> {
262 let mut reader = BufReader::new(stream);
263 let mut line = String::new();
264 reader.read_line(&mut line).map_err(AppError::Io)?;
265
266 if line.trim().is_empty() {
267 write_response(
268 reader.get_mut(),
269 &DaemonResponse::Error {
270 message: "empty daemon request".to_string(),
271 },
272 )?;
273 return Ok(false);
274 }
275
276 let request: DaemonRequest = serde_json::from_str(line.trim()).map_err(AppError::Json)?;
277 let (response, should_exit) = match request {
278 DaemonRequest::Ping => (
279 DaemonResponse::Ok {
280 pid: std::process::id(),
281 version: SQLITE_GRAPHRAG_VERSION.to_string(),
282 handled_embed_requests: *handled_embed_requests,
283 },
284 false,
285 ),
286 DaemonRequest::Shutdown => (
287 DaemonResponse::ShuttingDown {
288 handled_embed_requests: *handled_embed_requests,
289 },
290 true,
291 ),
292 DaemonRequest::EmbedPassage { text } => {
293 let embedder = embedder::get_embedder(models_dir)?;
294 let embedding = embedder::embed_passage(embedder, &text)?;
295 *handled_embed_requests += 1;
296 (
297 DaemonResponse::PassageEmbedding {
298 embedding,
299 handled_embed_requests: *handled_embed_requests,
300 },
301 false,
302 )
303 }
304 DaemonRequest::EmbedQuery { text } => {
305 let embedder = embedder::get_embedder(models_dir)?;
306 let embedding = embedder::embed_query(embedder, &text)?;
307 *handled_embed_requests += 1;
308 (
309 DaemonResponse::QueryEmbedding {
310 embedding,
311 handled_embed_requests: *handled_embed_requests,
312 },
313 false,
314 )
315 }
316 DaemonRequest::EmbedPassages {
317 texts,
318 token_counts,
319 } => {
320 let embedder = embedder::get_embedder(models_dir)?;
321 let text_refs: Vec<&str> = texts.iter().map(String::as_str).collect();
322 let embeddings =
323 embedder::embed_passages_controlled(embedder, &text_refs, &token_counts)?;
324 *handled_embed_requests += 1;
325 (
326 DaemonResponse::PassageEmbeddings {
327 embeddings,
328 handled_embed_requests: *handled_embed_requests,
329 },
330 false,
331 )
332 }
333 };
334
335 write_response(reader.get_mut(), &response)?;
336 Ok(should_exit)
337}
338
339fn write_response(
340 stream: &mut LocalSocketStream,
341 response: &DaemonResponse,
342) -> Result<(), AppError> {
343 serde_json::to_writer(&mut *stream, response).map_err(AppError::Json)?;
344 stream.write_all(b"\n").map_err(AppError::Io)?;
345 stream.flush().map_err(AppError::Io)?;
346 Ok(())
347}
348
349fn request_if_available(
350 models_dir: &Path,
351 request: &DaemonRequest,
352) -> Result<Option<DaemonResponse>, AppError> {
353 let socket = daemon_label(models_dir);
354 let name = match to_local_socket_name(&socket) {
355 Ok(name) => name,
356 Err(err) => return Err(AppError::Io(err)),
357 };
358
359 let mut stream = match LocalSocketStream::connect(name) {
360 Ok(stream) => stream,
361 Err(err)
362 if matches!(
363 err.kind(),
364 std::io::ErrorKind::NotFound
365 | std::io::ErrorKind::ConnectionRefused
366 | std::io::ErrorKind::AddrNotAvailable
367 | std::io::ErrorKind::TimedOut
368 ) =>
369 {
370 return Ok(None);
371 }
372 Err(err) => return Err(AppError::Io(err)),
373 };
374
375 serde_json::to_writer(&mut stream, request).map_err(AppError::Json)?;
376 stream.write_all(b"\n").map_err(AppError::Io)?;
377 stream.flush().map_err(AppError::Io)?;
378
379 let mut reader = BufReader::new(stream);
380 let mut line = String::new();
381 reader.read_line(&mut line).map_err(AppError::Io)?;
382 if line.trim().is_empty() {
383 return Err(AppError::Embedding("daemon returned empty response".into()));
384 }
385
386 let response = serde_json::from_str(line.trim()).map_err(AppError::Json)?;
387 Ok(Some(response))
388}
389
390fn request_or_autostart(
391 models_dir: &Path,
392 request: &DaemonRequest,
393) -> Result<Option<DaemonResponse>, AppError> {
394 if let Some(response) = request_if_available(models_dir, request)? {
395 clear_spawn_backoff_state(models_dir).ok();
396 return Ok(Some(response));
397 }
398
399 if autostart_disabled() {
400 return Ok(None);
401 }
402
403 if !ensure_daemon_running(models_dir)? {
404 return Ok(None);
405 }
406
407 request_if_available(models_dir, request)
408}
409
410fn ensure_daemon_running(models_dir: &Path) -> Result<bool, AppError> {
411 if (try_ping(models_dir)?).is_some() {
412 clear_spawn_backoff_state(models_dir).ok();
413 return Ok(true);
414 }
415
416 if spawn_backoff_active(models_dir)? {
417 tracing::warn!("daemon autostart suppressed by backoff window");
418 return Ok(false);
419 }
420
421 let spawn_lock = match try_acquire_spawn_lock(models_dir)? {
422 Some(lock) => lock,
423 None => return wait_for_daemon_ready(models_dir),
424 };
425
426 if (try_ping(models_dir)?).is_some() {
427 clear_spawn_backoff_state(models_dir).ok();
428 drop(spawn_lock);
429 return Ok(true);
430 }
431
432 let exe = match std::env::current_exe() {
433 Ok(path) => path,
434 Err(err) => {
435 record_spawn_failure(models_dir, format!("current_exe failed: {err}"))?;
436 drop(spawn_lock);
437 return Ok(false);
438 }
439 };
440
441 let mut child = std::process::Command::new(exe);
442 child
443 .arg("daemon")
444 .arg("--idle-shutdown-secs")
445 .arg(DAEMON_IDLE_SHUTDOWN_SECS.to_string())
446 .env("SQLITE_GRAPHRAG_DAEMON_CHILD", "1")
447 .stdin(Stdio::null())
448 .stdout(Stdio::null())
449 .stderr(Stdio::null());
450
451 match child.spawn() {
452 Ok(_) => {
453 let ready = wait_for_daemon_ready(models_dir)?;
454 if ready {
455 clear_spawn_backoff_state(models_dir).ok();
456 } else {
457 record_spawn_failure(
458 models_dir,
459 "daemon did not become healthy after autostart".to_string(),
460 )?;
461 }
462 drop(spawn_lock);
463 Ok(ready)
464 }
465 Err(err) => {
466 record_spawn_failure(models_dir, format!("daemon spawn failed: {err}"))?;
467 drop(spawn_lock);
468 Ok(false)
469 }
470 }
471}
472
473fn wait_for_daemon_ready(models_dir: &Path) -> Result<bool, AppError> {
474 let deadline = Instant::now() + Duration::from_millis(DAEMON_AUTO_START_MAX_WAIT_MS);
475 let mut sleep_ms = DAEMON_AUTO_START_INITIAL_BACKOFF_MS.max(DAEMON_PING_TIMEOUT_MS);
476
477 while Instant::now() < deadline {
478 if (try_ping(models_dir)?).is_some() {
479 return Ok(true);
480 }
481 thread::sleep(Duration::from_millis(sleep_ms));
482 sleep_ms = (sleep_ms * 2).min(DAEMON_AUTO_START_MAX_BACKOFF_MS);
483 }
484
485 Ok(false)
486}
487
488fn autostart_disabled() -> bool {
489 std::env::var("SQLITE_GRAPHRAG_DAEMON_CHILD").as_deref() == Ok("1")
490 || std::env::var("SQLITE_GRAPHRAG_DAEMON_FORCE_AUTOSTART").as_deref() != Ok("1")
491 && std::env::var("SQLITE_GRAPHRAG_DAEMON_DISABLE_AUTOSTART").as_deref() == Ok("1")
492}
493
494fn daemon_control_dir(models_dir: &Path) -> PathBuf {
495 models_dir
496 .parent()
497 .map(Path::to_path_buf)
498 .unwrap_or_else(|| models_dir.to_path_buf())
499}
500
501fn spawn_lock_path(models_dir: &Path) -> PathBuf {
502 daemon_control_dir(models_dir).join("daemon-spawn.lock")
503}
504
505fn spawn_state_path(models_dir: &Path) -> PathBuf {
506 daemon_control_dir(models_dir).join("daemon-spawn-state.json")
507}
508
509fn try_acquire_spawn_lock(models_dir: &Path) -> Result<Option<File>, AppError> {
510 let path = spawn_lock_path(models_dir);
511 std::fs::create_dir_all(crate::paths::parent_or_err(&path)?).map_err(AppError::Io)?;
512 let file = OpenOptions::new()
513 .read(true)
514 .write(true)
515 .create(true)
516 .truncate(false)
517 .open(path)
518 .map_err(AppError::Io)?;
519
520 let deadline = Instant::now() + Duration::from_millis(DAEMON_SPAWN_LOCK_WAIT_MS);
521 loop {
522 match file.try_lock_exclusive() {
523 Ok(()) => return Ok(Some(file)),
524 Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
525 if Instant::now() >= deadline {
526 return Ok(None);
527 }
528 thread::sleep(Duration::from_millis(50));
529 }
530 Err(err) => return Err(AppError::Io(err)),
531 }
532 }
533}
534
535fn spawn_backoff_active(models_dir: &Path) -> Result<bool, AppError> {
536 let state = load_spawn_state(models_dir)?;
537 Ok(now_epoch_ms() < state.not_before_epoch_ms)
538}
539
540fn record_spawn_failure(models_dir: &Path, message: String) -> Result<(), AppError> {
541 let mut state = load_spawn_state(models_dir)?;
542 state.consecutive_failures = state.consecutive_failures.saturating_add(1);
543 let exponent = state.consecutive_failures.saturating_sub(1).min(6);
544 let backoff_ms =
545 (DAEMON_SPAWN_BACKOFF_BASE_MS * (1_u64 << exponent)).min(DAEMON_AUTO_START_MAX_BACKOFF_MS);
546 state.not_before_epoch_ms = now_epoch_ms() + backoff_ms;
547 state.last_error = Some(message);
548 save_spawn_state(models_dir, &state)
549}
550
551fn clear_spawn_backoff_state(models_dir: &Path) -> Result<(), AppError> {
552 let path = spawn_state_path(models_dir);
553 if path.exists() {
554 std::fs::remove_file(path).map_err(AppError::Io)?;
555 }
556 Ok(())
557}
558
559fn load_spawn_state(models_dir: &Path) -> Result<DaemonSpawnState, AppError> {
560 let path = spawn_state_path(models_dir);
561 if !path.exists() {
562 return Ok(DaemonSpawnState::default());
563 }
564
565 let bytes = std::fs::read(path).map_err(AppError::Io)?;
566 serde_json::from_slice(&bytes).map_err(AppError::Json)
567}
568
569fn save_spawn_state(models_dir: &Path, state: &DaemonSpawnState) -> Result<(), AppError> {
570 let path = spawn_state_path(models_dir);
571 std::fs::create_dir_all(crate::paths::parent_or_err(&path)?).map_err(AppError::Io)?;
572 let bytes = serde_json::to_vec(state).map_err(AppError::Json)?;
573 std::fs::write(path, bytes).map_err(AppError::Io)
574}
575
576fn now_epoch_ms() -> u64 {
577 SystemTime::now()
578 .duration_since(UNIX_EPOCH)
579 .unwrap_or_else(|_| Duration::from_secs(0))
580 .as_millis() as u64
581}
582
583fn to_local_socket_name(name: &str) -> std::io::Result<interprocess::local_socket::Name<'static>> {
584 if let Ok(ns_name) = name.to_string().to_ns_name::<GenericNamespaced>() {
585 return Ok(ns_name);
586 }
587
588 let path = if cfg!(unix) {
589 format!("/tmp/{name}.sock")
590 } else {
591 format!(r"\\.\pipe\{name}")
592 };
593 path.to_fs_name::<GenericFilePath>()
594}
595
596#[cfg(test)]
597mod tests {
598 use super::*;
599
600 #[test]
601 fn record_and_clear_spawn_backoff_state() {
602 let tmp = tempfile::tempdir().unwrap();
603 let models_dir = tmp.path().join("cache").join("models");
604 std::fs::create_dir_all(&models_dir).unwrap();
605
606 assert!(!spawn_backoff_active(&models_dir).unwrap());
607
608 record_spawn_failure(&models_dir, "spawn failed".to_string()).unwrap();
609 assert!(spawn_backoff_active(&models_dir).unwrap());
610
611 let state = load_spawn_state(&models_dir).unwrap();
612 assert_eq!(state.consecutive_failures, 1);
613 assert_eq!(state.last_error.as_deref(), Some("spawn failed"));
614
615 clear_spawn_backoff_state(&models_dir).unwrap();
616 assert!(!spawn_backoff_active(&models_dir).unwrap());
617 }
618
619 #[test]
620 fn daemon_control_dir_usa_pai_de_models() {
621 let base = PathBuf::from("/tmp/sqlite-graphrag-cache-test");
622 let models_dir = base.join("models");
623 assert_eq!(daemon_control_dir(&models_dir), base);
624 }
625}