1use crate::constants::{
2 DAEMON_AUTO_START_INITIAL_BACKOFF_MS, DAEMON_AUTO_START_MAX_BACKOFF_MS,
3 DAEMON_AUTO_START_MAX_WAIT_MS, DAEMON_IDLE_SHUTDOWN_SECS, DAEMON_PING_TIMEOUT_MS,
4 DAEMON_SPAWN_BACKOFF_BASE_MS, DAEMON_SPAWN_LOCK_WAIT_MS, SQLITE_GRAPHRAG_VERSION,
5};
6use crate::errors::AppError;
7use crate::{embedder, shutdown_requested};
8use fs4::fs_std::FileExt;
9use interprocess::local_socket::{
10 prelude::LocalSocketStream,
11 traits::{Listener as _, Stream as _},
12 GenericFilePath, GenericNamespaced, ListenerNonblockingMode, ListenerOptions, ToFsName,
13 ToNsName,
14};
15use serde::{Deserialize, Serialize};
16use std::fs::{File, OpenOptions};
17use std::io::{BufRead, BufReader, Write};
18use std::path::{Path, PathBuf};
19use std::process::Stdio;
20use std::sync::atomic::{AtomicU64, Ordering};
21use std::sync::Arc;
22use std::thread;
23use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
24
25#[derive(Debug, Serialize, Deserialize)]
26#[serde(tag = "request", rename_all = "snake_case")]
27pub enum DaemonRequest {
28 Ping,
29 Shutdown,
30 EmbedPassage {
31 text: String,
32 },
33 EmbedQuery {
34 text: String,
35 },
36 EmbedPassages {
37 texts: Vec<String>,
38 token_counts: Vec<usize>,
39 },
40}
41
42#[derive(Debug, Serialize, Deserialize)]
43#[serde(tag = "status", rename_all = "snake_case")]
44pub enum DaemonResponse {
45 Listening {
46 pid: u32,
47 socket: String,
48 idle_shutdown_secs: u64,
49 },
50 Ok {
51 pid: u32,
52 version: String,
53 handled_embed_requests: u64,
54 },
55 PassageEmbedding {
56 embedding: Vec<f32>,
57 handled_embed_requests: u64,
58 },
59 QueryEmbedding {
60 embedding: Vec<f32>,
61 handled_embed_requests: u64,
62 },
63 PassageEmbeddings {
64 embeddings: Vec<Vec<f32>>,
65 handled_embed_requests: u64,
66 },
67 ShuttingDown {
68 handled_embed_requests: u64,
69 },
70 Error {
71 message: String,
72 },
73}
74
75#[derive(Debug, Default, Serialize, Deserialize)]
76struct DaemonSpawnState {
77 consecutive_failures: u32,
78 not_before_epoch_ms: u64,
79 last_error: Option<String>,
80}
81
82pub fn daemon_label(models_dir: &Path) -> String {
83 let hash = blake3::hash(models_dir.to_string_lossy().as_bytes())
84 .to_hex()
85 .to_string();
86 format!("sqlite-graphrag-daemon-{}", &hash[..16])
87}
88
89pub fn try_ping(models_dir: &Path) -> Result<Option<DaemonResponse>, AppError> {
90 request_if_available(models_dir, &DaemonRequest::Ping)
91}
92
93pub fn try_shutdown(models_dir: &Path) -> Result<Option<DaemonResponse>, AppError> {
94 request_if_available(models_dir, &DaemonRequest::Shutdown)
95}
96
97pub fn embed_passage_or_local(models_dir: &Path, text: &str) -> Result<Vec<f32>, AppError> {
98 match request_or_autostart(
99 models_dir,
100 &DaemonRequest::EmbedPassage {
101 text: text.to_string(),
102 },
103 )? {
104 Some(DaemonResponse::PassageEmbedding { embedding, .. }) => Ok(embedding),
105 Some(DaemonResponse::Error { message }) => Err(AppError::Embedding(message)),
106 Some(other) => Err(AppError::Internal(anyhow::anyhow!(
107 "resposta inesperada do daemon para embedding de passage: {other:?}"
108 ))),
109 None => {
110 let embedder = embedder::get_embedder(models_dir)?;
111 embedder::embed_passage(embedder, text)
112 }
113 }
114}
115
116pub fn embed_query_or_local(models_dir: &Path, text: &str) -> Result<Vec<f32>, AppError> {
117 match request_or_autostart(
118 models_dir,
119 &DaemonRequest::EmbedQuery {
120 text: text.to_string(),
121 },
122 )? {
123 Some(DaemonResponse::QueryEmbedding { embedding, .. }) => Ok(embedding),
124 Some(DaemonResponse::Error { message }) => Err(AppError::Embedding(message)),
125 Some(other) => Err(AppError::Internal(anyhow::anyhow!(
126 "resposta inesperada do daemon para embedding de query: {other:?}"
127 ))),
128 None => {
129 let embedder = embedder::get_embedder(models_dir)?;
130 embedder::embed_query(embedder, text)
131 }
132 }
133}
134
135pub fn embed_passages_controlled_or_local(
136 models_dir: &Path,
137 texts: &[&str],
138 token_counts: &[usize],
139) -> Result<Vec<Vec<f32>>, AppError> {
140 let request = DaemonRequest::EmbedPassages {
141 texts: texts.iter().map(|t| (*t).to_string()).collect(),
142 token_counts: token_counts.to_vec(),
143 };
144
145 match request_or_autostart(models_dir, &request)? {
146 Some(DaemonResponse::PassageEmbeddings { embeddings, .. }) => Ok(embeddings),
147 Some(DaemonResponse::Error { message }) => Err(AppError::Embedding(message)),
148 Some(other) => Err(AppError::Internal(anyhow::anyhow!(
149 "resposta inesperada do daemon para batch de embeddings de passage: {other:?}"
150 ))),
151 None => {
152 let embedder = embedder::get_embedder(models_dir)?;
153 embedder::embed_passages_controlled(embedder, texts, token_counts)
154 }
155 }
156}
157
158struct DaemonSpawnGuard {
159 models_dir: PathBuf,
160}
161
162impl DaemonSpawnGuard {
163 fn new(models_dir: &Path) -> Self {
164 Self {
165 models_dir: models_dir.to_path_buf(),
166 }
167 }
168}
169
170impl Drop for DaemonSpawnGuard {
171 fn drop(&mut self) {
172 let lock_path = spawn_lock_path(&self.models_dir);
173 if lock_path.exists() {
174 match std::fs::remove_file(&lock_path) {
175 Ok(()) => {
176 tracing::debug!(
177 path = %lock_path.display(),
178 "lock file de spawn removido ao encerrar daemon graciosamente"
179 );
180 }
181 Err(err) if err.kind() == std::io::ErrorKind::NotFound => {}
182 Err(err) => {
183 tracing::warn!(
184 error = %err,
185 path = %lock_path.display(),
186 "falha ao remover lock file de spawn ao encerrar daemon"
187 );
188 }
189 }
190 }
191 tracing::info!(
192 "daemon encerrado graciosamente; socket será limpo pelo OS ou pelo próximo daemon via try_overwrite"
193 );
194 }
195}
196
197pub fn run(models_dir: &Path, idle_shutdown_secs: u64) -> Result<(), AppError> {
198 let rt = tokio::runtime::Builder::new_multi_thread()
202 .worker_threads(2)
203 .thread_name("daemon-worker")
204 .enable_all()
205 .build()
206 .map_err(AppError::Io)?;
207
208 rt.block_on(run_async(models_dir, idle_shutdown_secs))
209}
210
211async fn run_async(models_dir: &Path, idle_shutdown_secs: u64) -> Result<(), AppError> {
212 let socket = daemon_label(models_dir);
213 let name = to_local_socket_name(&socket)?;
214 let listener = ListenerOptions::new()
215 .name(name)
216 .nonblocking(ListenerNonblockingMode::Accept)
217 .try_overwrite(true)
218 .create_sync()
219 .map_err(AppError::Io)?;
220
221 let _spawn_guard = DaemonSpawnGuard::new(models_dir);
224
225 let models_dir_warm = models_dir.to_path_buf();
229 tokio::task::spawn_blocking(move || embedder::get_embedder(&models_dir_warm).map(|_| ()))
230 .await
231 .map_err(|e| AppError::Internal(anyhow::anyhow!("model warm-up panicked: {e}")))??;
232
233 crate::output::emit_json(&DaemonResponse::Listening {
234 pid: std::process::id(),
235 socket,
236 idle_shutdown_secs,
237 })?;
238
239 let handled_embed_requests = Arc::new(AtomicU64::new(0));
240 let mut last_activity = Instant::now();
241 let models_dir = models_dir.to_path_buf();
242
243 loop {
244 if shutdown_requested() {
245 break;
246 }
247
248 if !daemon_control_dir(&models_dir).exists() {
249 tracing::info!("daemon control directory disappeared; shutting down");
250 break;
251 }
252
253 match listener.accept() {
254 Ok(stream) => {
255 last_activity = Instant::now();
256 let models_dir_clone = models_dir.clone();
257 let counter = Arc::clone(&handled_embed_requests);
258 let should_exit = tokio::task::spawn_blocking(move || {
259 handle_client(stream, &models_dir_clone, &counter)
260 })
261 .await
262 .map_err(|e| {
263 AppError::Internal(anyhow::anyhow!("spawn_blocking panicked: {e}"))
264 })??;
265
266 if should_exit {
267 break;
268 }
269 }
270 Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
271 if last_activity.elapsed() >= Duration::from_secs(idle_shutdown_secs) {
272 tracing::info!(
273 idle_shutdown_secs,
274 handled_embed_requests = handled_embed_requests.load(Ordering::Relaxed),
275 "daemon idle timeout reached"
276 );
277 break;
278 }
279 tokio::time::sleep(Duration::from_millis(50)).await;
280 }
281 Err(err) => return Err(AppError::Io(err)),
282 }
283 }
284
285 Ok(())
286}
287
288fn handle_client(
289 stream: LocalSocketStream,
290 models_dir: &Path,
291 handled_embed_requests: &AtomicU64,
292) -> Result<bool, AppError> {
293 let mut reader = BufReader::new(stream);
294 let mut line = String::new();
295 reader.read_line(&mut line).map_err(AppError::Io)?;
296
297 if line.trim().is_empty() {
298 write_response(
299 reader.get_mut(),
300 &DaemonResponse::Error {
301 message: "requisição vazia ao daemon".to_string(),
302 },
303 )?;
304 return Ok(false);
305 }
306
307 let request: DaemonRequest = serde_json::from_str(line.trim()).map_err(AppError::Json)?;
308 let (response, should_exit) = match request {
309 DaemonRequest::Ping => (
310 DaemonResponse::Ok {
311 pid: std::process::id(),
312 version: SQLITE_GRAPHRAG_VERSION.to_string(),
313 handled_embed_requests: handled_embed_requests.load(Ordering::Relaxed),
314 },
315 false,
316 ),
317 DaemonRequest::Shutdown => (
318 DaemonResponse::ShuttingDown {
319 handled_embed_requests: handled_embed_requests.load(Ordering::Relaxed),
320 },
321 true,
322 ),
323 DaemonRequest::EmbedPassage { text } => {
324 let embedder = embedder::get_embedder(models_dir)?;
325 let embedding = embedder::embed_passage(embedder, &text)?;
326 let count = handled_embed_requests.fetch_add(1, Ordering::Relaxed) + 1;
327 (
328 DaemonResponse::PassageEmbedding {
329 embedding,
330 handled_embed_requests: count,
331 },
332 false,
333 )
334 }
335 DaemonRequest::EmbedQuery { text } => {
336 let embedder = embedder::get_embedder(models_dir)?;
337 let embedding = embedder::embed_query(embedder, &text)?;
338 let count = handled_embed_requests.fetch_add(1, Ordering::Relaxed) + 1;
339 (
340 DaemonResponse::QueryEmbedding {
341 embedding,
342 handled_embed_requests: count,
343 },
344 false,
345 )
346 }
347 DaemonRequest::EmbedPassages {
348 texts,
349 token_counts,
350 } => {
351 let embedder = embedder::get_embedder(models_dir)?;
352 let text_refs: Vec<&str> = texts.iter().map(String::as_str).collect();
353 let embeddings =
354 embedder::embed_passages_controlled(embedder, &text_refs, &token_counts)?;
355 let count = handled_embed_requests.fetch_add(1, Ordering::Relaxed) + 1;
356 (
357 DaemonResponse::PassageEmbeddings {
358 embeddings,
359 handled_embed_requests: count,
360 },
361 false,
362 )
363 }
364 };
365
366 write_response(reader.get_mut(), &response)?;
367 Ok(should_exit)
368}
369
370fn write_response(
371 stream: &mut LocalSocketStream,
372 response: &DaemonResponse,
373) -> Result<(), AppError> {
374 serde_json::to_writer(&mut *stream, response).map_err(AppError::Json)?;
375 stream.write_all(b"\n").map_err(AppError::Io)?;
376 stream.flush().map_err(AppError::Io)?;
377 Ok(())
378}
379
380fn request_if_available(
381 models_dir: &Path,
382 request: &DaemonRequest,
383) -> Result<Option<DaemonResponse>, AppError> {
384 let socket = daemon_label(models_dir);
385 let name = match to_local_socket_name(&socket) {
386 Ok(name) => name,
387 Err(err) => return Err(AppError::Io(err)),
388 };
389
390 let mut stream = match LocalSocketStream::connect(name) {
391 Ok(stream) => stream,
392 Err(err)
393 if matches!(
394 err.kind(),
395 std::io::ErrorKind::NotFound
396 | std::io::ErrorKind::ConnectionRefused
397 | std::io::ErrorKind::AddrNotAvailable
398 | std::io::ErrorKind::TimedOut
399 ) =>
400 {
401 return Ok(None);
402 }
403 Err(err) => return Err(AppError::Io(err)),
404 };
405
406 serde_json::to_writer(&mut stream, request).map_err(AppError::Json)?;
407 stream.write_all(b"\n").map_err(AppError::Io)?;
408 stream.flush().map_err(AppError::Io)?;
409
410 let mut reader = BufReader::new(stream);
411 let mut line = String::new();
412 reader.read_line(&mut line).map_err(AppError::Io)?;
413 if line.trim().is_empty() {
414 return Err(AppError::Embedding("daemon retornou resposta vazia".into()));
415 }
416
417 let response = serde_json::from_str(line.trim()).map_err(AppError::Json)?;
418 Ok(Some(response))
419}
420
421fn request_or_autostart(
422 models_dir: &Path,
423 request: &DaemonRequest,
424) -> Result<Option<DaemonResponse>, AppError> {
425 if let Some(response) = request_if_available(models_dir, request)? {
426 clear_spawn_backoff_state(models_dir).ok();
427 return Ok(Some(response));
428 }
429
430 if autostart_disabled() {
431 return Ok(None);
432 }
433
434 if !ensure_daemon_running(models_dir)? {
435 return Ok(None);
436 }
437
438 request_if_available(models_dir, request)
439}
440
441fn ensure_daemon_running(models_dir: &Path) -> Result<bool, AppError> {
442 if (try_ping(models_dir)?).is_some() {
443 clear_spawn_backoff_state(models_dir).ok();
444 return Ok(true);
445 }
446
447 if spawn_backoff_active(models_dir)? {
448 tracing::warn!("daemon autostart suppressed by backoff window");
449 return Ok(false);
450 }
451
452 let spawn_lock = match try_acquire_spawn_lock(models_dir)? {
453 Some(lock) => lock,
454 None => return wait_for_daemon_ready(models_dir),
455 };
456
457 if (try_ping(models_dir)?).is_some() {
458 clear_spawn_backoff_state(models_dir).ok();
459 drop(spawn_lock);
460 return Ok(true);
461 }
462
463 let exe = match std::env::current_exe() {
464 Ok(path) => path,
465 Err(err) => {
466 record_spawn_failure(models_dir, format!("current_exe failed: {err}"))?;
467 drop(spawn_lock);
468 return Ok(false);
469 }
470 };
471
472 let mut child = std::process::Command::new(exe);
473 child
474 .arg("daemon")
475 .arg("--idle-shutdown-secs")
476 .arg(DAEMON_IDLE_SHUTDOWN_SECS.to_string())
477 .env("SQLITE_GRAPHRAG_DAEMON_CHILD", "1")
478 .stdin(Stdio::null())
479 .stdout(Stdio::null())
480 .stderr(Stdio::null());
481
482 match child.spawn() {
483 Ok(_) => {
484 let ready = wait_for_daemon_ready(models_dir)?;
485 if ready {
486 clear_spawn_backoff_state(models_dir).ok();
487 } else {
488 record_spawn_failure(
489 models_dir,
490 "daemon did not become healthy after autostart".to_string(),
491 )?;
492 }
493 drop(spawn_lock);
494 Ok(ready)
495 }
496 Err(err) => {
497 record_spawn_failure(models_dir, format!("daemon spawn failed: {err}"))?;
498 drop(spawn_lock);
499 Ok(false)
500 }
501 }
502}
503
504fn wait_for_daemon_ready(models_dir: &Path) -> Result<bool, AppError> {
505 let deadline = Instant::now() + Duration::from_millis(DAEMON_AUTO_START_MAX_WAIT_MS);
506 let mut sleep_ms = DAEMON_AUTO_START_INITIAL_BACKOFF_MS.max(DAEMON_PING_TIMEOUT_MS);
507
508 while Instant::now() < deadline {
509 if (try_ping(models_dir)?).is_some() {
510 return Ok(true);
511 }
512 thread::sleep(Duration::from_millis(sleep_ms));
513 sleep_ms = (sleep_ms * 2).min(DAEMON_AUTO_START_MAX_BACKOFF_MS);
514 }
515
516 Ok(false)
517}
518
519fn autostart_disabled() -> bool {
520 std::env::var("SQLITE_GRAPHRAG_DAEMON_CHILD").as_deref() == Ok("1")
521 || std::env::var("SQLITE_GRAPHRAG_DAEMON_FORCE_AUTOSTART").as_deref() != Ok("1")
522 && std::env::var("SQLITE_GRAPHRAG_DAEMON_DISABLE_AUTOSTART").as_deref() == Ok("1")
523}
524
525fn daemon_control_dir(models_dir: &Path) -> PathBuf {
526 models_dir
527 .parent()
528 .map(Path::to_path_buf)
529 .unwrap_or_else(|| models_dir.to_path_buf())
530}
531
532fn spawn_lock_path(models_dir: &Path) -> PathBuf {
533 daemon_control_dir(models_dir).join("daemon-spawn.lock")
534}
535
536fn spawn_state_path(models_dir: &Path) -> PathBuf {
537 daemon_control_dir(models_dir).join("daemon-spawn-state.json")
538}
539
540fn try_acquire_spawn_lock(models_dir: &Path) -> Result<Option<File>, AppError> {
541 let path = spawn_lock_path(models_dir);
542 std::fs::create_dir_all(crate::paths::parent_or_err(&path)?).map_err(AppError::Io)?;
543 let file = OpenOptions::new()
544 .read(true)
545 .write(true)
546 .create(true)
547 .truncate(false)
548 .open(path)
549 .map_err(AppError::Io)?;
550
551 let deadline = Instant::now() + Duration::from_millis(DAEMON_SPAWN_LOCK_WAIT_MS);
552 loop {
553 match file.try_lock_exclusive() {
554 Ok(()) => return Ok(Some(file)),
555 Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
556 if Instant::now() >= deadline {
557 return Ok(None);
558 }
559 thread::sleep(Duration::from_millis(50));
560 }
561 Err(err) => return Err(AppError::Io(err)),
562 }
563 }
564}
565
566fn spawn_backoff_active(models_dir: &Path) -> Result<bool, AppError> {
567 let state = load_spawn_state(models_dir)?;
568 Ok(now_epoch_ms() < state.not_before_epoch_ms)
569}
570
571fn record_spawn_failure(models_dir: &Path, message: String) -> Result<(), AppError> {
572 let mut state = load_spawn_state(models_dir)?;
573 state.consecutive_failures = state.consecutive_failures.saturating_add(1);
574 let exponent = state.consecutive_failures.saturating_sub(1).min(6);
575 let backoff_ms =
576 (DAEMON_SPAWN_BACKOFF_BASE_MS * (1_u64 << exponent)).min(DAEMON_AUTO_START_MAX_BACKOFF_MS);
577 state.not_before_epoch_ms = now_epoch_ms() + backoff_ms;
578 state.last_error = Some(message);
579 save_spawn_state(models_dir, &state)
580}
581
582fn clear_spawn_backoff_state(models_dir: &Path) -> Result<(), AppError> {
583 let path = spawn_state_path(models_dir);
584 if path.exists() {
585 std::fs::remove_file(path).map_err(AppError::Io)?;
586 }
587 Ok(())
588}
589
590fn load_spawn_state(models_dir: &Path) -> Result<DaemonSpawnState, AppError> {
591 let path = spawn_state_path(models_dir);
592 if !path.exists() {
593 return Ok(DaemonSpawnState::default());
594 }
595
596 let bytes = std::fs::read(path).map_err(AppError::Io)?;
597 serde_json::from_slice(&bytes).map_err(AppError::Json)
598}
599
600fn save_spawn_state(models_dir: &Path, state: &DaemonSpawnState) -> Result<(), AppError> {
601 let path = spawn_state_path(models_dir);
602 std::fs::create_dir_all(crate::paths::parent_or_err(&path)?).map_err(AppError::Io)?;
603 let bytes = serde_json::to_vec(state).map_err(AppError::Json)?;
604 std::fs::write(path, bytes).map_err(AppError::Io)
605}
606
607fn now_epoch_ms() -> u64 {
608 SystemTime::now()
609 .duration_since(UNIX_EPOCH)
610 .unwrap_or_else(|_| Duration::from_secs(0))
611 .as_millis() as u64
612}
613
614fn to_local_socket_name(name: &str) -> std::io::Result<interprocess::local_socket::Name<'static>> {
615 if let Ok(ns_name) = name.to_string().to_ns_name::<GenericNamespaced>() {
616 return Ok(ns_name);
617 }
618
619 let path = if cfg!(unix) {
620 format!("/tmp/{name}.sock")
621 } else {
622 format!(r"\\.\pipe\{name}")
623 };
624 path.to_fs_name::<GenericFilePath>()
625}
626
627#[cfg(test)]
628mod tests {
629 use super::*;
630
631 #[test]
632 fn record_and_clear_spawn_backoff_state() {
633 let tmp = tempfile::tempdir().unwrap();
634 let models_dir = tmp.path().join("cache").join("models");
635 std::fs::create_dir_all(&models_dir).unwrap();
636
637 assert!(!spawn_backoff_active(&models_dir).unwrap());
638
639 record_spawn_failure(&models_dir, "spawn failed".to_string()).unwrap();
640 assert!(spawn_backoff_active(&models_dir).unwrap());
641
642 let state = load_spawn_state(&models_dir).unwrap();
643 assert_eq!(state.consecutive_failures, 1);
644 assert_eq!(state.last_error.as_deref(), Some("spawn failed"));
645
646 clear_spawn_backoff_state(&models_dir).unwrap();
647 assert!(!spawn_backoff_active(&models_dir).unwrap());
648 }
649
650 #[test]
651 fn daemon_control_dir_usa_pai_de_models() {
652 let base = PathBuf::from("/tmp/sqlite-graphrag-cache-test");
653 let models_dir = base.join("models");
654 assert_eq!(daemon_control_dir(&models_dir), base);
655 }
656}