1use crate::constants::{
2 DAEMON_AUTO_START_INITIAL_BACKOFF_MS, DAEMON_AUTO_START_MAX_BACKOFF_MS,
3 DAEMON_AUTO_START_MAX_WAIT_MS, DAEMON_IDLE_SHUTDOWN_SECS, DAEMON_PING_TIMEOUT_MS,
4 DAEMON_SPAWN_BACKOFF_BASE_MS, DAEMON_SPAWN_LOCK_WAIT_MS, SQLITE_GRAPHRAG_VERSION,
5};
6use crate::errors::AppError;
7use crate::{embedder, shutdown_requested};
8use fs4::fs_std::FileExt;
9use interprocess::local_socket::{
10 prelude::LocalSocketStream,
11 traits::{Listener as _, Stream as _},
12 GenericFilePath, GenericNamespaced, ListenerNonblockingMode, ListenerOptions, ToFsName,
13 ToNsName,
14};
15use serde::{Deserialize, Serialize};
16use std::fs::{File, OpenOptions};
17use std::io::{BufRead, BufReader, Write};
18use std::path::{Path, PathBuf};
19use std::process::Stdio;
20use std::thread;
21use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
22
23#[derive(Debug, Serialize, Deserialize)]
24#[serde(tag = "request", rename_all = "snake_case")]
25pub enum DaemonRequest {
26 Ping,
27 Shutdown,
28 EmbedPassage {
29 text: String,
30 },
31 EmbedQuery {
32 text: String,
33 },
34 EmbedPassages {
35 texts: Vec<String>,
36 token_counts: Vec<usize>,
37 },
38}
39
40#[derive(Debug, Serialize, Deserialize)]
41#[serde(tag = "status", rename_all = "snake_case")]
42pub enum DaemonResponse {
43 Listening {
44 pid: u32,
45 socket: String,
46 idle_shutdown_secs: u64,
47 },
48 Ok {
49 pid: u32,
50 version: String,
51 handled_embed_requests: u64,
52 },
53 PassageEmbedding {
54 embedding: Vec<f32>,
55 handled_embed_requests: u64,
56 },
57 QueryEmbedding {
58 embedding: Vec<f32>,
59 handled_embed_requests: u64,
60 },
61 PassageEmbeddings {
62 embeddings: Vec<Vec<f32>>,
63 handled_embed_requests: u64,
64 },
65 ShuttingDown {
66 handled_embed_requests: u64,
67 },
68 Error {
69 message: String,
70 },
71}
72
73#[derive(Debug, Default, Serialize, Deserialize)]
74struct DaemonSpawnState {
75 consecutive_failures: u32,
76 not_before_epoch_ms: u64,
77 last_error: Option<String>,
78}
79
80pub fn daemon_label(models_dir: &Path) -> String {
81 let hash = blake3::hash(models_dir.to_string_lossy().as_bytes())
82 .to_hex()
83 .to_string();
84 format!("sqlite-graphrag-daemon-{}", &hash[..16])
85}
86
87pub fn try_ping(models_dir: &Path) -> Result<Option<DaemonResponse>, AppError> {
88 request_if_available(models_dir, &DaemonRequest::Ping)
89}
90
91pub fn try_shutdown(models_dir: &Path) -> Result<Option<DaemonResponse>, AppError> {
92 request_if_available(models_dir, &DaemonRequest::Shutdown)
93}
94
95pub fn embed_passage_or_local(models_dir: &Path, text: &str) -> Result<Vec<f32>, AppError> {
96 match request_or_autostart(
97 models_dir,
98 &DaemonRequest::EmbedPassage {
99 text: text.to_string(),
100 },
101 )? {
102 Some(DaemonResponse::PassageEmbedding { embedding, .. }) => Ok(embedding),
103 Some(DaemonResponse::Error { message }) => Err(AppError::Embedding(message)),
104 Some(other) => Err(AppError::Internal(anyhow::anyhow!(
105 "resposta inesperada do daemon para embedding de passage: {other:?}"
106 ))),
107 None => {
108 let embedder = embedder::get_embedder(models_dir)?;
109 embedder::embed_passage(embedder, text)
110 }
111 }
112}
113
114pub fn embed_query_or_local(models_dir: &Path, text: &str) -> Result<Vec<f32>, AppError> {
115 match request_or_autostart(
116 models_dir,
117 &DaemonRequest::EmbedQuery {
118 text: text.to_string(),
119 },
120 )? {
121 Some(DaemonResponse::QueryEmbedding { embedding, .. }) => Ok(embedding),
122 Some(DaemonResponse::Error { message }) => Err(AppError::Embedding(message)),
123 Some(other) => Err(AppError::Internal(anyhow::anyhow!(
124 "resposta inesperada do daemon para embedding de query: {other:?}"
125 ))),
126 None => {
127 let embedder = embedder::get_embedder(models_dir)?;
128 embedder::embed_query(embedder, text)
129 }
130 }
131}
132
133pub fn embed_passages_controlled_or_local(
134 models_dir: &Path,
135 texts: &[&str],
136 token_counts: &[usize],
137) -> Result<Vec<Vec<f32>>, AppError> {
138 let request = DaemonRequest::EmbedPassages {
139 texts: texts.iter().map(|t| (*t).to_string()).collect(),
140 token_counts: token_counts.to_vec(),
141 };
142
143 match request_or_autostart(models_dir, &request)? {
144 Some(DaemonResponse::PassageEmbeddings { embeddings, .. }) => Ok(embeddings),
145 Some(DaemonResponse::Error { message }) => Err(AppError::Embedding(message)),
146 Some(other) => Err(AppError::Internal(anyhow::anyhow!(
147 "resposta inesperada do daemon para batch de embeddings de passage: {other:?}"
148 ))),
149 None => {
150 let embedder = embedder::get_embedder(models_dir)?;
151 embedder::embed_passages_controlled(embedder, texts, token_counts)
152 }
153 }
154}
155
156struct DaemonSpawnGuard {
157 models_dir: PathBuf,
158}
159
160impl DaemonSpawnGuard {
161 fn new(models_dir: &Path) -> Self {
162 Self {
163 models_dir: models_dir.to_path_buf(),
164 }
165 }
166}
167
168impl Drop for DaemonSpawnGuard {
169 fn drop(&mut self) {
170 let lock_path = spawn_lock_path(&self.models_dir);
171 if lock_path.exists() {
172 match std::fs::remove_file(&lock_path) {
173 Ok(()) => {
174 tracing::debug!(
175 path = %lock_path.display(),
176 "lock file de spawn removido ao encerrar daemon graciosamente"
177 );
178 }
179 Err(err) if err.kind() == std::io::ErrorKind::NotFound => {}
180 Err(err) => {
181 tracing::warn!(
182 error = %err,
183 path = %lock_path.display(),
184 "falha ao remover lock file de spawn ao encerrar daemon"
185 );
186 }
187 }
188 }
189 tracing::info!(
190 "daemon encerrado graciosamente; socket será limpo pelo OS ou pelo próximo daemon via try_overwrite"
191 );
192 }
193}
194
195pub fn run(models_dir: &Path, idle_shutdown_secs: u64) -> Result<(), AppError> {
196 let rt = tokio::runtime::Builder::new_multi_thread()
200 .worker_threads(2)
201 .thread_name("daemon-worker")
202 .enable_all()
203 .build()
204 .map_err(AppError::Io)?;
205
206 rt.block_on(run_async(models_dir, idle_shutdown_secs))
207}
208
209async fn run_async(models_dir: &Path, idle_shutdown_secs: u64) -> Result<(), AppError> {
210 let socket = daemon_label(models_dir);
211 let name = to_local_socket_name(&socket)?;
212 let listener = ListenerOptions::new()
213 .name(name)
214 .nonblocking(ListenerNonblockingMode::Accept)
215 .try_overwrite(true)
216 .create_sync()
217 .map_err(AppError::Io)?;
218
219 let _spawn_guard = DaemonSpawnGuard::new(models_dir);
222
223 let models_dir_warm = models_dir.to_path_buf();
227 tokio::task::spawn_blocking(move || embedder::get_embedder(&models_dir_warm).map(|_| ()))
228 .await
229 .map_err(|e| AppError::Internal(anyhow::anyhow!("model warm-up panicked: {e}")))??;
230
231 crate::output::emit_json(&DaemonResponse::Listening {
232 pid: std::process::id(),
233 socket,
234 idle_shutdown_secs,
235 })?;
236
237 let mut handled_embed_requests = 0_u64;
238 let mut last_activity = Instant::now();
239 let models_dir = models_dir.to_path_buf();
240
241 loop {
242 if shutdown_requested() {
243 break;
244 }
245
246 if !daemon_control_dir(&models_dir).exists() {
247 tracing::info!("daemon control directory disappeared; shutting down");
248 break;
249 }
250
251 match listener.accept() {
252 Ok(stream) => {
253 last_activity = Instant::now();
254 let models_dir_clone = models_dir.clone();
255 let result = tokio::task::spawn_blocking(move || {
256 let mut counter = 0_u64;
257 handle_client(stream, &models_dir_clone, &mut counter)
258 .map(|should_exit| (should_exit, counter))
259 })
260 .await
261 .map_err(|e| AppError::Internal(anyhow::anyhow!("spawn_blocking panicked: {e}")))?;
262
263 let (should_exit, delta) = result?;
264 handled_embed_requests += delta;
265
266 if should_exit {
267 break;
268 }
269 }
270 Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
271 if last_activity.elapsed() >= Duration::from_secs(idle_shutdown_secs) {
272 tracing::info!(
273 idle_shutdown_secs,
274 handled_embed_requests,
275 "daemon idle timeout reached"
276 );
277 break;
278 }
279 tokio::time::sleep(Duration::from_millis(50)).await;
280 }
281 Err(err) => return Err(AppError::Io(err)),
282 }
283 }
284
285 Ok(())
286}
287
288fn handle_client(
289 stream: LocalSocketStream,
290 models_dir: &Path,
291 handled_embed_requests: &mut u64,
292) -> Result<bool, AppError> {
293 let mut reader = BufReader::new(stream);
294 let mut line = String::new();
295 reader.read_line(&mut line).map_err(AppError::Io)?;
296
297 if line.trim().is_empty() {
298 write_response(
299 reader.get_mut(),
300 &DaemonResponse::Error {
301 message: "requisição vazia ao daemon".to_string(),
302 },
303 )?;
304 return Ok(false);
305 }
306
307 let request: DaemonRequest = serde_json::from_str(line.trim()).map_err(AppError::Json)?;
308 let (response, should_exit) = match request {
309 DaemonRequest::Ping => (
310 DaemonResponse::Ok {
311 pid: std::process::id(),
312 version: SQLITE_GRAPHRAG_VERSION.to_string(),
313 handled_embed_requests: *handled_embed_requests,
314 },
315 false,
316 ),
317 DaemonRequest::Shutdown => (
318 DaemonResponse::ShuttingDown {
319 handled_embed_requests: *handled_embed_requests,
320 },
321 true,
322 ),
323 DaemonRequest::EmbedPassage { text } => {
324 let embedder = embedder::get_embedder(models_dir)?;
325 let embedding = embedder::embed_passage(embedder, &text)?;
326 *handled_embed_requests += 1;
327 (
328 DaemonResponse::PassageEmbedding {
329 embedding,
330 handled_embed_requests: *handled_embed_requests,
331 },
332 false,
333 )
334 }
335 DaemonRequest::EmbedQuery { text } => {
336 let embedder = embedder::get_embedder(models_dir)?;
337 let embedding = embedder::embed_query(embedder, &text)?;
338 *handled_embed_requests += 1;
339 (
340 DaemonResponse::QueryEmbedding {
341 embedding,
342 handled_embed_requests: *handled_embed_requests,
343 },
344 false,
345 )
346 }
347 DaemonRequest::EmbedPassages {
348 texts,
349 token_counts,
350 } => {
351 let embedder = embedder::get_embedder(models_dir)?;
352 let text_refs: Vec<&str> = texts.iter().map(String::as_str).collect();
353 let embeddings =
354 embedder::embed_passages_controlled(embedder, &text_refs, &token_counts)?;
355 *handled_embed_requests += 1;
356 (
357 DaemonResponse::PassageEmbeddings {
358 embeddings,
359 handled_embed_requests: *handled_embed_requests,
360 },
361 false,
362 )
363 }
364 };
365
366 write_response(reader.get_mut(), &response)?;
367 Ok(should_exit)
368}
369
370fn write_response(
371 stream: &mut LocalSocketStream,
372 response: &DaemonResponse,
373) -> Result<(), AppError> {
374 serde_json::to_writer(&mut *stream, response).map_err(AppError::Json)?;
375 stream.write_all(b"\n").map_err(AppError::Io)?;
376 stream.flush().map_err(AppError::Io)?;
377 Ok(())
378}
379
380fn request_if_available(
381 models_dir: &Path,
382 request: &DaemonRequest,
383) -> Result<Option<DaemonResponse>, AppError> {
384 let socket = daemon_label(models_dir);
385 let name = match to_local_socket_name(&socket) {
386 Ok(name) => name,
387 Err(err) => return Err(AppError::Io(err)),
388 };
389
390 let mut stream = match LocalSocketStream::connect(name) {
391 Ok(stream) => stream,
392 Err(err)
393 if matches!(
394 err.kind(),
395 std::io::ErrorKind::NotFound
396 | std::io::ErrorKind::ConnectionRefused
397 | std::io::ErrorKind::AddrNotAvailable
398 | std::io::ErrorKind::TimedOut
399 ) =>
400 {
401 return Ok(None);
402 }
403 Err(err) => return Err(AppError::Io(err)),
404 };
405
406 serde_json::to_writer(&mut stream, request).map_err(AppError::Json)?;
407 stream.write_all(b"\n").map_err(AppError::Io)?;
408 stream.flush().map_err(AppError::Io)?;
409
410 let mut reader = BufReader::new(stream);
411 let mut line = String::new();
412 reader.read_line(&mut line).map_err(AppError::Io)?;
413 if line.trim().is_empty() {
414 return Err(AppError::Embedding("daemon retornou resposta vazia".into()));
415 }
416
417 let response = serde_json::from_str(line.trim()).map_err(AppError::Json)?;
418 Ok(Some(response))
419}
420
421fn request_or_autostart(
422 models_dir: &Path,
423 request: &DaemonRequest,
424) -> Result<Option<DaemonResponse>, AppError> {
425 if let Some(response) = request_if_available(models_dir, request)? {
426 clear_spawn_backoff_state(models_dir).ok();
427 return Ok(Some(response));
428 }
429
430 if autostart_disabled() {
431 return Ok(None);
432 }
433
434 if !ensure_daemon_running(models_dir)? {
435 return Ok(None);
436 }
437
438 request_if_available(models_dir, request)
439}
440
441fn ensure_daemon_running(models_dir: &Path) -> Result<bool, AppError> {
442 if (try_ping(models_dir)?).is_some() {
443 clear_spawn_backoff_state(models_dir).ok();
444 return Ok(true);
445 }
446
447 if spawn_backoff_active(models_dir)? {
448 tracing::warn!("daemon autostart suppressed by backoff window");
449 return Ok(false);
450 }
451
452 let spawn_lock = match try_acquire_spawn_lock(models_dir)? {
453 Some(lock) => lock,
454 None => return wait_for_daemon_ready(models_dir),
455 };
456
457 if (try_ping(models_dir)?).is_some() {
458 clear_spawn_backoff_state(models_dir).ok();
459 drop(spawn_lock);
460 return Ok(true);
461 }
462
463 let exe = match std::env::current_exe() {
464 Ok(path) => path,
465 Err(err) => {
466 record_spawn_failure(models_dir, format!("current_exe failed: {err}"))?;
467 drop(spawn_lock);
468 return Ok(false);
469 }
470 };
471
472 let mut child = std::process::Command::new(exe);
473 child
474 .arg("daemon")
475 .arg("--idle-shutdown-secs")
476 .arg(DAEMON_IDLE_SHUTDOWN_SECS.to_string())
477 .env("SQLITE_GRAPHRAG_DAEMON_CHILD", "1")
478 .stdin(Stdio::null())
479 .stdout(Stdio::null())
480 .stderr(Stdio::null());
481
482 match child.spawn() {
483 Ok(_) => {
484 let ready = wait_for_daemon_ready(models_dir)?;
485 if ready {
486 clear_spawn_backoff_state(models_dir).ok();
487 } else {
488 record_spawn_failure(
489 models_dir,
490 "daemon did not become healthy after autostart".to_string(),
491 )?;
492 }
493 drop(spawn_lock);
494 Ok(ready)
495 }
496 Err(err) => {
497 record_spawn_failure(models_dir, format!("daemon spawn failed: {err}"))?;
498 drop(spawn_lock);
499 Ok(false)
500 }
501 }
502}
503
504fn wait_for_daemon_ready(models_dir: &Path) -> Result<bool, AppError> {
505 let deadline = Instant::now() + Duration::from_millis(DAEMON_AUTO_START_MAX_WAIT_MS);
506 let mut sleep_ms = DAEMON_AUTO_START_INITIAL_BACKOFF_MS.max(DAEMON_PING_TIMEOUT_MS);
507
508 while Instant::now() < deadline {
509 if (try_ping(models_dir)?).is_some() {
510 return Ok(true);
511 }
512 thread::sleep(Duration::from_millis(sleep_ms));
513 sleep_ms = (sleep_ms * 2).min(DAEMON_AUTO_START_MAX_BACKOFF_MS);
514 }
515
516 Ok(false)
517}
518
519fn autostart_disabled() -> bool {
520 std::env::var("SQLITE_GRAPHRAG_DAEMON_CHILD").as_deref() == Ok("1")
521 || std::env::var("SQLITE_GRAPHRAG_DAEMON_FORCE_AUTOSTART").as_deref() != Ok("1")
522 && std::env::var("SQLITE_GRAPHRAG_DAEMON_DISABLE_AUTOSTART").as_deref() == Ok("1")
523}
524
525fn daemon_control_dir(models_dir: &Path) -> PathBuf {
526 models_dir
527 .parent()
528 .map(Path::to_path_buf)
529 .unwrap_or_else(|| models_dir.to_path_buf())
530}
531
532fn spawn_lock_path(models_dir: &Path) -> PathBuf {
533 daemon_control_dir(models_dir).join("daemon-spawn.lock")
534}
535
536fn spawn_state_path(models_dir: &Path) -> PathBuf {
537 daemon_control_dir(models_dir).join("daemon-spawn-state.json")
538}
539
540fn try_acquire_spawn_lock(models_dir: &Path) -> Result<Option<File>, AppError> {
541 let path = spawn_lock_path(models_dir);
542 std::fs::create_dir_all(crate::paths::parent_or_err(&path)?).map_err(AppError::Io)?;
543 let file = OpenOptions::new()
544 .read(true)
545 .write(true)
546 .create(true)
547 .truncate(false)
548 .open(path)
549 .map_err(AppError::Io)?;
550
551 let deadline = Instant::now() + Duration::from_millis(DAEMON_SPAWN_LOCK_WAIT_MS);
552 loop {
553 match file.try_lock_exclusive() {
554 Ok(()) => return Ok(Some(file)),
555 Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
556 if Instant::now() >= deadline {
557 return Ok(None);
558 }
559 thread::sleep(Duration::from_millis(50));
560 }
561 Err(err) => return Err(AppError::Io(err)),
562 }
563 }
564}
565
566fn spawn_backoff_active(models_dir: &Path) -> Result<bool, AppError> {
567 let state = load_spawn_state(models_dir)?;
568 Ok(now_epoch_ms() < state.not_before_epoch_ms)
569}
570
571fn record_spawn_failure(models_dir: &Path, message: String) -> Result<(), AppError> {
572 let mut state = load_spawn_state(models_dir)?;
573 state.consecutive_failures = state.consecutive_failures.saturating_add(1);
574 let exponent = state.consecutive_failures.saturating_sub(1).min(6);
575 let backoff_ms =
576 (DAEMON_SPAWN_BACKOFF_BASE_MS * (1_u64 << exponent)).min(DAEMON_AUTO_START_MAX_BACKOFF_MS);
577 state.not_before_epoch_ms = now_epoch_ms() + backoff_ms;
578 state.last_error = Some(message);
579 save_spawn_state(models_dir, &state)
580}
581
582fn clear_spawn_backoff_state(models_dir: &Path) -> Result<(), AppError> {
583 let path = spawn_state_path(models_dir);
584 if path.exists() {
585 std::fs::remove_file(path).map_err(AppError::Io)?;
586 }
587 Ok(())
588}
589
590fn load_spawn_state(models_dir: &Path) -> Result<DaemonSpawnState, AppError> {
591 let path = spawn_state_path(models_dir);
592 if !path.exists() {
593 return Ok(DaemonSpawnState::default());
594 }
595
596 let bytes = std::fs::read(path).map_err(AppError::Io)?;
597 serde_json::from_slice(&bytes).map_err(AppError::Json)
598}
599
600fn save_spawn_state(models_dir: &Path, state: &DaemonSpawnState) -> Result<(), AppError> {
601 let path = spawn_state_path(models_dir);
602 std::fs::create_dir_all(crate::paths::parent_or_err(&path)?).map_err(AppError::Io)?;
603 let bytes = serde_json::to_vec(state).map_err(AppError::Json)?;
604 std::fs::write(path, bytes).map_err(AppError::Io)
605}
606
607fn now_epoch_ms() -> u64 {
608 SystemTime::now()
609 .duration_since(UNIX_EPOCH)
610 .unwrap_or_else(|_| Duration::from_secs(0))
611 .as_millis() as u64
612}
613
614fn to_local_socket_name(name: &str) -> std::io::Result<interprocess::local_socket::Name<'static>> {
615 if let Ok(ns_name) = name.to_string().to_ns_name::<GenericNamespaced>() {
616 return Ok(ns_name);
617 }
618
619 let path = if cfg!(unix) {
620 format!("/tmp/{name}.sock")
621 } else {
622 format!(r"\\.\pipe\{name}")
623 };
624 path.to_fs_name::<GenericFilePath>()
625}
626
627#[cfg(test)]
628mod tests {
629 use super::*;
630
631 #[test]
632 fn record_and_clear_spawn_backoff_state() {
633 let tmp = tempfile::tempdir().unwrap();
634 let models_dir = tmp.path().join("cache").join("models");
635 std::fs::create_dir_all(&models_dir).unwrap();
636
637 assert!(!spawn_backoff_active(&models_dir).unwrap());
638
639 record_spawn_failure(&models_dir, "spawn failed".to_string()).unwrap();
640 assert!(spawn_backoff_active(&models_dir).unwrap());
641
642 let state = load_spawn_state(&models_dir).unwrap();
643 assert_eq!(state.consecutive_failures, 1);
644 assert_eq!(state.last_error.as_deref(), Some("spawn failed"));
645
646 clear_spawn_backoff_state(&models_dir).unwrap();
647 assert!(!spawn_backoff_active(&models_dir).unwrap());
648 }
649
650 #[test]
651 fn daemon_control_dir_usa_pai_de_models() {
652 let base = PathBuf::from("/tmp/sqlite-graphrag-cache-test");
653 let models_dir = base.join("models");
654 assert_eq!(daemon_control_dir(&models_dir), base);
655 }
656}