1use crate::constants::{
2 DAEMON_AUTO_START_INITIAL_BACKOFF_MS, DAEMON_AUTO_START_MAX_BACKOFF_MS,
3 DAEMON_AUTO_START_MAX_WAIT_MS, DAEMON_IDLE_SHUTDOWN_SECS, DAEMON_PING_TIMEOUT_MS,
4 DAEMON_SPAWN_BACKOFF_BASE_MS, DAEMON_SPAWN_LOCK_WAIT_MS, SQLITE_GRAPHRAG_VERSION,
5};
6use crate::errors::AppError;
7use crate::{embedder, shutdown_requested};
8use fs4::fs_std::FileExt;
9use interprocess::local_socket::{
10 prelude::LocalSocketStream,
11 traits::{Listener as _, Stream as _},
12 GenericFilePath, GenericNamespaced, ListenerNonblockingMode, ListenerOptions, ToFsName,
13 ToNsName,
14};
15use serde::{Deserialize, Serialize};
16use std::fs::{File, OpenOptions};
17use std::io::{BufRead, BufReader, Write};
18use std::path::{Path, PathBuf};
19use std::process::Stdio;
20use std::thread;
21use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
22
23#[derive(Debug, Serialize, Deserialize)]
24#[serde(tag = "request", rename_all = "snake_case")]
25pub enum DaemonRequest {
26 Ping,
27 Shutdown,
28 EmbedPassage {
29 text: String,
30 },
31 EmbedQuery {
32 text: String,
33 },
34 EmbedPassages {
35 texts: Vec<String>,
36 token_counts: Vec<usize>,
37 },
38}
39
40#[derive(Debug, Serialize, Deserialize)]
41#[serde(tag = "status", rename_all = "snake_case")]
42pub enum DaemonResponse {
43 Listening {
44 pid: u32,
45 socket: String,
46 idle_shutdown_secs: u64,
47 },
48 Ok {
49 pid: u32,
50 version: String,
51 handled_embed_requests: u64,
52 },
53 PassageEmbedding {
54 embedding: Vec<f32>,
55 handled_embed_requests: u64,
56 },
57 QueryEmbedding {
58 embedding: Vec<f32>,
59 handled_embed_requests: u64,
60 },
61 PassageEmbeddings {
62 embeddings: Vec<Vec<f32>>,
63 handled_embed_requests: u64,
64 },
65 ShuttingDown {
66 handled_embed_requests: u64,
67 },
68 Error {
69 message: String,
70 },
71}
72
73#[derive(Debug, Default, Serialize, Deserialize)]
74struct DaemonSpawnState {
75 consecutive_failures: u32,
76 not_before_epoch_ms: u64,
77 last_error: Option<String>,
78}
79
80pub fn daemon_label(models_dir: &Path) -> String {
81 let hash = blake3::hash(models_dir.to_string_lossy().as_bytes())
82 .to_hex()
83 .to_string();
84 format!("sqlite-graphrag-daemon-{}", &hash[..16])
85}
86
87pub fn try_ping(models_dir: &Path) -> Result<Option<DaemonResponse>, AppError> {
88 request_if_available(models_dir, &DaemonRequest::Ping)
89}
90
91pub fn try_shutdown(models_dir: &Path) -> Result<Option<DaemonResponse>, AppError> {
92 request_if_available(models_dir, &DaemonRequest::Shutdown)
93}
94
95pub fn embed_passage_or_local(models_dir: &Path, text: &str) -> Result<Vec<f32>, AppError> {
96 match request_or_autostart(
97 models_dir,
98 &DaemonRequest::EmbedPassage {
99 text: text.to_string(),
100 },
101 )? {
102 Some(DaemonResponse::PassageEmbedding { embedding, .. }) => Ok(embedding),
103 Some(DaemonResponse::Error { message }) => Err(AppError::Embedding(message)),
104 Some(other) => Err(AppError::Internal(anyhow::anyhow!(
105 "resposta inesperada do daemon para embedding de passage: {other:?}"
106 ))),
107 None => {
108 let embedder = embedder::get_embedder(models_dir)?;
109 embedder::embed_passage(embedder, text)
110 }
111 }
112}
113
114pub fn embed_query_or_local(models_dir: &Path, text: &str) -> Result<Vec<f32>, AppError> {
115 match request_or_autostart(
116 models_dir,
117 &DaemonRequest::EmbedQuery {
118 text: text.to_string(),
119 },
120 )? {
121 Some(DaemonResponse::QueryEmbedding { embedding, .. }) => Ok(embedding),
122 Some(DaemonResponse::Error { message }) => Err(AppError::Embedding(message)),
123 Some(other) => Err(AppError::Internal(anyhow::anyhow!(
124 "resposta inesperada do daemon para embedding de query: {other:?}"
125 ))),
126 None => {
127 let embedder = embedder::get_embedder(models_dir)?;
128 embedder::embed_query(embedder, text)
129 }
130 }
131}
132
133pub fn embed_passages_controlled_or_local(
134 models_dir: &Path,
135 texts: &[&str],
136 token_counts: &[usize],
137) -> Result<Vec<Vec<f32>>, AppError> {
138 let request = DaemonRequest::EmbedPassages {
139 texts: texts.iter().map(|t| (*t).to_string()).collect(),
140 token_counts: token_counts.to_vec(),
141 };
142
143 match request_or_autostart(models_dir, &request)? {
144 Some(DaemonResponse::PassageEmbeddings { embeddings, .. }) => Ok(embeddings),
145 Some(DaemonResponse::Error { message }) => Err(AppError::Embedding(message)),
146 Some(other) => Err(AppError::Internal(anyhow::anyhow!(
147 "resposta inesperada do daemon para batch de embeddings de passage: {other:?}"
148 ))),
149 None => {
150 let embedder = embedder::get_embedder(models_dir)?;
151 embedder::embed_passages_controlled(embedder, texts, token_counts)
152 }
153 }
154}
155
156struct DaemonSpawnGuard {
157 models_dir: PathBuf,
158}
159
160impl DaemonSpawnGuard {
161 fn new(models_dir: &Path) -> Self {
162 Self {
163 models_dir: models_dir.to_path_buf(),
164 }
165 }
166}
167
168impl Drop for DaemonSpawnGuard {
169 fn drop(&mut self) {
170 let lock_path = spawn_lock_path(&self.models_dir);
171 if lock_path.exists() {
172 match std::fs::remove_file(&lock_path) {
173 Ok(()) => {
174 tracing::debug!(
175 path = %lock_path.display(),
176 "lock file de spawn removido ao encerrar daemon graciosamente"
177 );
178 }
179 Err(err) if err.kind() == std::io::ErrorKind::NotFound => {}
180 Err(err) => {
181 tracing::warn!(
182 error = %err,
183 path = %lock_path.display(),
184 "falha ao remover lock file de spawn ao encerrar daemon"
185 );
186 }
187 }
188 }
189 tracing::info!(
190 "daemon encerrado graciosamente; socket será limpo pelo OS ou pelo próximo daemon via try_overwrite"
191 );
192 }
193}
194
195pub fn run(models_dir: &Path, idle_shutdown_secs: u64) -> Result<(), AppError> {
196 let rt = tokio::runtime::Builder::new_multi_thread()
200 .worker_threads(2)
201 .thread_name("daemon-worker")
202 .enable_all()
203 .build()
204 .map_err(AppError::Io)?;
205
206 rt.block_on(run_async(models_dir, idle_shutdown_secs))
207}
208
209async fn run_async(models_dir: &Path, idle_shutdown_secs: u64) -> Result<(), AppError> {
210 let socket = daemon_label(models_dir);
211 let name = to_local_socket_name(&socket)?;
212 let listener = ListenerOptions::new()
213 .name(name)
214 .nonblocking(ListenerNonblockingMode::Accept)
215 .try_overwrite(true)
216 .create_sync()
217 .map_err(AppError::Io)?;
218
219 let _spawn_guard = DaemonSpawnGuard::new(models_dir);
222
223 let _ = embedder::get_embedder(models_dir)?;
225
226 crate::output::emit_json(&DaemonResponse::Listening {
227 pid: std::process::id(),
228 socket,
229 idle_shutdown_secs,
230 })?;
231
232 let mut handled_embed_requests = 0_u64;
233 let mut last_activity = Instant::now();
234 let models_dir = models_dir.to_path_buf();
235
236 loop {
237 if shutdown_requested() {
238 break;
239 }
240
241 if !daemon_control_dir(&models_dir).exists() {
242 tracing::info!("daemon control directory disappeared; shutting down");
243 break;
244 }
245
246 match listener.accept() {
247 Ok(stream) => {
248 last_activity = Instant::now();
249 let models_dir_clone = models_dir.clone();
250 let result = tokio::task::spawn_blocking(move || {
251 let mut counter = 0_u64;
252 handle_client(stream, &models_dir_clone, &mut counter)
253 .map(|should_exit| (should_exit, counter))
254 })
255 .await
256 .map_err(|e| AppError::Internal(anyhow::anyhow!("spawn_blocking panicked: {e}")))?;
257
258 let (should_exit, delta) = result?;
259 handled_embed_requests += delta;
260
261 if should_exit {
262 break;
263 }
264 }
265 Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
266 if last_activity.elapsed() >= Duration::from_secs(idle_shutdown_secs) {
267 tracing::info!(
268 idle_shutdown_secs,
269 handled_embed_requests,
270 "daemon idle timeout reached"
271 );
272 break;
273 }
274 tokio::time::sleep(Duration::from_millis(50)).await;
275 }
276 Err(err) => return Err(AppError::Io(err)),
277 }
278 }
279
280 Ok(())
281}
282
283fn handle_client(
284 stream: LocalSocketStream,
285 models_dir: &Path,
286 handled_embed_requests: &mut u64,
287) -> Result<bool, AppError> {
288 let mut reader = BufReader::new(stream);
289 let mut line = String::new();
290 reader.read_line(&mut line).map_err(AppError::Io)?;
291
292 if line.trim().is_empty() {
293 write_response(
294 reader.get_mut(),
295 &DaemonResponse::Error {
296 message: "requisição vazia ao daemon".to_string(),
297 },
298 )?;
299 return Ok(false);
300 }
301
302 let request: DaemonRequest = serde_json::from_str(line.trim()).map_err(AppError::Json)?;
303 let (response, should_exit) = match request {
304 DaemonRequest::Ping => (
305 DaemonResponse::Ok {
306 pid: std::process::id(),
307 version: SQLITE_GRAPHRAG_VERSION.to_string(),
308 handled_embed_requests: *handled_embed_requests,
309 },
310 false,
311 ),
312 DaemonRequest::Shutdown => (
313 DaemonResponse::ShuttingDown {
314 handled_embed_requests: *handled_embed_requests,
315 },
316 true,
317 ),
318 DaemonRequest::EmbedPassage { text } => {
319 let embedder = embedder::get_embedder(models_dir)?;
320 let embedding = embedder::embed_passage(embedder, &text)?;
321 *handled_embed_requests += 1;
322 (
323 DaemonResponse::PassageEmbedding {
324 embedding,
325 handled_embed_requests: *handled_embed_requests,
326 },
327 false,
328 )
329 }
330 DaemonRequest::EmbedQuery { text } => {
331 let embedder = embedder::get_embedder(models_dir)?;
332 let embedding = embedder::embed_query(embedder, &text)?;
333 *handled_embed_requests += 1;
334 (
335 DaemonResponse::QueryEmbedding {
336 embedding,
337 handled_embed_requests: *handled_embed_requests,
338 },
339 false,
340 )
341 }
342 DaemonRequest::EmbedPassages {
343 texts,
344 token_counts,
345 } => {
346 let embedder = embedder::get_embedder(models_dir)?;
347 let text_refs: Vec<&str> = texts.iter().map(String::as_str).collect();
348 let embeddings =
349 embedder::embed_passages_controlled(embedder, &text_refs, &token_counts)?;
350 *handled_embed_requests += 1;
351 (
352 DaemonResponse::PassageEmbeddings {
353 embeddings,
354 handled_embed_requests: *handled_embed_requests,
355 },
356 false,
357 )
358 }
359 };
360
361 write_response(reader.get_mut(), &response)?;
362 Ok(should_exit)
363}
364
365fn write_response(
366 stream: &mut LocalSocketStream,
367 response: &DaemonResponse,
368) -> Result<(), AppError> {
369 serde_json::to_writer(&mut *stream, response).map_err(AppError::Json)?;
370 stream.write_all(b"\n").map_err(AppError::Io)?;
371 stream.flush().map_err(AppError::Io)?;
372 Ok(())
373}
374
375fn request_if_available(
376 models_dir: &Path,
377 request: &DaemonRequest,
378) -> Result<Option<DaemonResponse>, AppError> {
379 let socket = daemon_label(models_dir);
380 let name = match to_local_socket_name(&socket) {
381 Ok(name) => name,
382 Err(err) => return Err(AppError::Io(err)),
383 };
384
385 let mut stream = match LocalSocketStream::connect(name) {
386 Ok(stream) => stream,
387 Err(err)
388 if matches!(
389 err.kind(),
390 std::io::ErrorKind::NotFound
391 | std::io::ErrorKind::ConnectionRefused
392 | std::io::ErrorKind::AddrNotAvailable
393 | std::io::ErrorKind::TimedOut
394 ) =>
395 {
396 return Ok(None);
397 }
398 Err(err) => return Err(AppError::Io(err)),
399 };
400
401 serde_json::to_writer(&mut stream, request).map_err(AppError::Json)?;
402 stream.write_all(b"\n").map_err(AppError::Io)?;
403 stream.flush().map_err(AppError::Io)?;
404
405 let mut reader = BufReader::new(stream);
406 let mut line = String::new();
407 reader.read_line(&mut line).map_err(AppError::Io)?;
408 if line.trim().is_empty() {
409 return Err(AppError::Embedding("daemon retornou resposta vazia".into()));
410 }
411
412 let response = serde_json::from_str(line.trim()).map_err(AppError::Json)?;
413 Ok(Some(response))
414}
415
416fn request_or_autostart(
417 models_dir: &Path,
418 request: &DaemonRequest,
419) -> Result<Option<DaemonResponse>, AppError> {
420 if let Some(response) = request_if_available(models_dir, request)? {
421 clear_spawn_backoff_state(models_dir).ok();
422 return Ok(Some(response));
423 }
424
425 if autostart_disabled() {
426 return Ok(None);
427 }
428
429 if !ensure_daemon_running(models_dir)? {
430 return Ok(None);
431 }
432
433 request_if_available(models_dir, request)
434}
435
436fn ensure_daemon_running(models_dir: &Path) -> Result<bool, AppError> {
437 if (try_ping(models_dir)?).is_some() {
438 clear_spawn_backoff_state(models_dir).ok();
439 return Ok(true);
440 }
441
442 if spawn_backoff_active(models_dir)? {
443 tracing::warn!("daemon autostart suppressed by backoff window");
444 return Ok(false);
445 }
446
447 let spawn_lock = match try_acquire_spawn_lock(models_dir)? {
448 Some(lock) => lock,
449 None => return wait_for_daemon_ready(models_dir),
450 };
451
452 if (try_ping(models_dir)?).is_some() {
453 clear_spawn_backoff_state(models_dir).ok();
454 drop(spawn_lock);
455 return Ok(true);
456 }
457
458 let exe = match std::env::current_exe() {
459 Ok(path) => path,
460 Err(err) => {
461 record_spawn_failure(models_dir, format!("current_exe failed: {err}"))?;
462 drop(spawn_lock);
463 return Ok(false);
464 }
465 };
466
467 let mut child = std::process::Command::new(exe);
468 child
469 .arg("daemon")
470 .arg("--idle-shutdown-secs")
471 .arg(DAEMON_IDLE_SHUTDOWN_SECS.to_string())
472 .env("SQLITE_GRAPHRAG_DAEMON_CHILD", "1")
473 .stdin(Stdio::null())
474 .stdout(Stdio::null())
475 .stderr(Stdio::null());
476
477 match child.spawn() {
478 Ok(_) => {
479 let ready = wait_for_daemon_ready(models_dir)?;
480 if ready {
481 clear_spawn_backoff_state(models_dir).ok();
482 } else {
483 record_spawn_failure(
484 models_dir,
485 "daemon did not become healthy after autostart".to_string(),
486 )?;
487 }
488 drop(spawn_lock);
489 Ok(ready)
490 }
491 Err(err) => {
492 record_spawn_failure(models_dir, format!("daemon spawn failed: {err}"))?;
493 drop(spawn_lock);
494 Ok(false)
495 }
496 }
497}
498
499fn wait_for_daemon_ready(models_dir: &Path) -> Result<bool, AppError> {
500 let deadline = Instant::now() + Duration::from_millis(DAEMON_AUTO_START_MAX_WAIT_MS);
501 let mut sleep_ms = DAEMON_AUTO_START_INITIAL_BACKOFF_MS.max(DAEMON_PING_TIMEOUT_MS);
502
503 while Instant::now() < deadline {
504 if (try_ping(models_dir)?).is_some() {
505 return Ok(true);
506 }
507 thread::sleep(Duration::from_millis(sleep_ms));
508 sleep_ms = (sleep_ms * 2).min(DAEMON_AUTO_START_MAX_BACKOFF_MS);
509 }
510
511 Ok(false)
512}
513
514fn autostart_disabled() -> bool {
515 std::env::var("SQLITE_GRAPHRAG_DAEMON_CHILD").as_deref() == Ok("1")
516 || std::env::var("SQLITE_GRAPHRAG_DAEMON_FORCE_AUTOSTART").as_deref() != Ok("1")
517 && std::env::var("SQLITE_GRAPHRAG_DAEMON_DISABLE_AUTOSTART").as_deref() == Ok("1")
518}
519
520fn daemon_control_dir(models_dir: &Path) -> PathBuf {
521 models_dir
522 .parent()
523 .map(Path::to_path_buf)
524 .unwrap_or_else(|| models_dir.to_path_buf())
525}
526
527fn spawn_lock_path(models_dir: &Path) -> PathBuf {
528 daemon_control_dir(models_dir).join("daemon-spawn.lock")
529}
530
531fn spawn_state_path(models_dir: &Path) -> PathBuf {
532 daemon_control_dir(models_dir).join("daemon-spawn-state.json")
533}
534
535fn try_acquire_spawn_lock(models_dir: &Path) -> Result<Option<File>, AppError> {
536 let path = spawn_lock_path(models_dir);
537 std::fs::create_dir_all(crate::paths::parent_or_err(&path)?).map_err(AppError::Io)?;
538 let file = OpenOptions::new()
539 .read(true)
540 .write(true)
541 .create(true)
542 .truncate(false)
543 .open(path)
544 .map_err(AppError::Io)?;
545
546 let deadline = Instant::now() + Duration::from_millis(DAEMON_SPAWN_LOCK_WAIT_MS);
547 loop {
548 match file.try_lock_exclusive() {
549 Ok(()) => return Ok(Some(file)),
550 Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
551 if Instant::now() >= deadline {
552 return Ok(None);
553 }
554 thread::sleep(Duration::from_millis(50));
555 }
556 Err(err) => return Err(AppError::Io(err)),
557 }
558 }
559}
560
561fn spawn_backoff_active(models_dir: &Path) -> Result<bool, AppError> {
562 let state = load_spawn_state(models_dir)?;
563 Ok(now_epoch_ms() < state.not_before_epoch_ms)
564}
565
566fn record_spawn_failure(models_dir: &Path, message: String) -> Result<(), AppError> {
567 let mut state = load_spawn_state(models_dir)?;
568 state.consecutive_failures = state.consecutive_failures.saturating_add(1);
569 let exponent = state.consecutive_failures.saturating_sub(1).min(6);
570 let backoff_ms =
571 (DAEMON_SPAWN_BACKOFF_BASE_MS * (1_u64 << exponent)).min(DAEMON_AUTO_START_MAX_BACKOFF_MS);
572 state.not_before_epoch_ms = now_epoch_ms() + backoff_ms;
573 state.last_error = Some(message);
574 save_spawn_state(models_dir, &state)
575}
576
577fn clear_spawn_backoff_state(models_dir: &Path) -> Result<(), AppError> {
578 let path = spawn_state_path(models_dir);
579 if path.exists() {
580 std::fs::remove_file(path).map_err(AppError::Io)?;
581 }
582 Ok(())
583}
584
585fn load_spawn_state(models_dir: &Path) -> Result<DaemonSpawnState, AppError> {
586 let path = spawn_state_path(models_dir);
587 if !path.exists() {
588 return Ok(DaemonSpawnState::default());
589 }
590
591 let bytes = std::fs::read(path).map_err(AppError::Io)?;
592 serde_json::from_slice(&bytes).map_err(AppError::Json)
593}
594
595fn save_spawn_state(models_dir: &Path, state: &DaemonSpawnState) -> Result<(), AppError> {
596 let path = spawn_state_path(models_dir);
597 std::fs::create_dir_all(crate::paths::parent_or_err(&path)?).map_err(AppError::Io)?;
598 let bytes = serde_json::to_vec(state).map_err(AppError::Json)?;
599 std::fs::write(path, bytes).map_err(AppError::Io)
600}
601
602fn now_epoch_ms() -> u64 {
603 SystemTime::now()
604 .duration_since(UNIX_EPOCH)
605 .unwrap_or_else(|_| Duration::from_secs(0))
606 .as_millis() as u64
607}
608
609fn to_local_socket_name(name: &str) -> std::io::Result<interprocess::local_socket::Name<'static>> {
610 if let Ok(ns_name) = name.to_string().to_ns_name::<GenericNamespaced>() {
611 return Ok(ns_name);
612 }
613
614 let path = if cfg!(unix) {
615 format!("/tmp/{name}.sock")
616 } else {
617 format!(r"\\.\pipe\{name}")
618 };
619 path.to_fs_name::<GenericFilePath>()
620}
621
622#[cfg(test)]
623mod tests {
624 use super::*;
625
626 #[test]
627 fn record_and_clear_spawn_backoff_state() {
628 let tmp = tempfile::tempdir().unwrap();
629 let models_dir = tmp.path().join("cache").join("models");
630 std::fs::create_dir_all(&models_dir).unwrap();
631
632 assert!(!spawn_backoff_active(&models_dir).unwrap());
633
634 record_spawn_failure(&models_dir, "spawn failed".to_string()).unwrap();
635 assert!(spawn_backoff_active(&models_dir).unwrap());
636
637 let state = load_spawn_state(&models_dir).unwrap();
638 assert_eq!(state.consecutive_failures, 1);
639 assert_eq!(state.last_error.as_deref(), Some("spawn failed"));
640
641 clear_spawn_backoff_state(&models_dir).unwrap();
642 assert!(!spawn_backoff_active(&models_dir).unwrap());
643 }
644
645 #[test]
646 fn daemon_control_dir_usa_pai_de_models() {
647 let base = PathBuf::from("/tmp/sqlite-graphrag-cache-test");
648 let models_dir = base.join("models");
649 assert_eq!(daemon_control_dir(&models_dir), base);
650 }
651}