1use std::ffi::OsString;
7use std::fs::{self, DirBuilder};
8use std::io::{Read, Write};
9use std::os::unix::fs::{DirBuilderExt, FileTypeExt, PermissionsExt};
10use std::os::unix::net::{UnixListener, UnixStream};
11use std::path::{Path, PathBuf};
12use std::sync::Arc;
13use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
14use std::time::{Duration, Instant};
15
16use fs2::FileExt;
17use parking_lot::RwLock;
18use tracing::{debug, error, info, warn};
19
20use super::daemon_run_lock_path;
21use super::models::ModelManager;
22use super::protocol::{
23 EmbedResponse, EmbeddingJobDetail, EmbeddingJobInfo, ErrorCode, ErrorResponse, FramedMessage,
24 HealthStatus, ModelInfo, PROTOCOL_VERSION, Request, RerankResponse, Response, StatusResponse,
25 decode_message, default_socket_path, encode_message,
26};
27use super::resource::ResourceMonitor;
28use super::worker::{EmbeddingJobConfig, EmbeddingWorker, EmbeddingWorkerHandle};
29
30struct BoundDaemonSocket {
31 listener: UnixListener,
32 public_path: PathBuf,
33 bind_path: PathBuf,
34}
35
36fn create_owner_only_dir_all(path: &Path) -> std::io::Result<()> {
37 let mut builder = DirBuilder::new();
38 builder.recursive(true);
39 builder.mode(0o700);
40 builder.create(path)?;
41
42 let meta = fs::symlink_metadata(path)?;
46 if !meta.file_type().is_dir() {
47 return Err(std::io::Error::new(
48 std::io::ErrorKind::AlreadyExists,
49 format!(
50 "path exists but is not a regular directory: {}",
51 path.display()
52 ),
53 ));
54 }
55
56 if meta.permissions().mode() & 0o777 != 0o700 {
59 fs::set_permissions(path, fs::Permissions::from_mode(0o700))?;
60 }
61 Ok(())
62}
63
64fn parent_dir_is_owner_only(path: &Path) -> std::io::Result<bool> {
65 let Some(parent) = path.parent() else {
66 return Ok(false);
67 };
68
69 let metadata = fs::symlink_metadata(parent)?;
70 if !metadata.file_type().is_dir() {
71 return Err(std::io::Error::new(
72 std::io::ErrorKind::InvalidInput,
73 format!("socket parent is not a directory: {}", parent.display()),
74 ));
75 }
76
77 Ok(metadata.permissions().mode() & 0o077 == 0)
78}
79
80fn private_runtime_dir_for_socket(socket_path: &Path) -> std::io::Result<PathBuf> {
81 let parent = socket_path.parent().unwrap_or_else(|| Path::new("."));
82 let file_name = socket_path.file_name().ok_or_else(|| {
83 std::io::Error::new(
84 std::io::ErrorKind::InvalidInput,
85 format!("socket path has no file name: {}", socket_path.display()),
86 )
87 })?;
88
89 let mut runtime_name = OsString::from(".");
90 runtime_name.push(file_name);
91 runtime_name.push(".runtime");
92 Ok(parent.join(runtime_name))
93}
94
95fn remove_stale_socket_path(path: &Path) -> std::io::Result<()> {
96 match fs::symlink_metadata(path) {
97 Ok(metadata) => {
98 let file_type = metadata.file_type();
99 if file_type.is_socket() || file_type.is_symlink() {
100 fs::remove_file(path)
101 } else {
102 Err(std::io::Error::new(
103 std::io::ErrorKind::AlreadyExists,
104 format!(
105 "refusing to remove non-socket daemon path: {}",
106 path.display()
107 ),
108 ))
109 }
110 }
111 Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
112 Err(e) => Err(e),
113 }
114}
115
116fn bind_owner_only_unix_listener(socket_path: &Path) -> std::io::Result<BoundDaemonSocket> {
117 if let Some(parent) = socket_path.parent()
118 && !parent.exists()
119 {
120 create_owner_only_dir_all(parent)?;
121 }
122
123 let bind_path = if parent_dir_is_owner_only(socket_path)? {
124 socket_path.to_path_buf()
125 } else {
126 let runtime_dir = private_runtime_dir_for_socket(socket_path)?;
127 create_owner_only_dir_all(&runtime_dir)?;
128 runtime_dir.join("daemon.sock")
129 };
130
131 remove_stale_socket_path(&bind_path)?;
132 if bind_path != socket_path {
133 remove_stale_socket_path(socket_path)?;
134 }
135
136 let listener = UnixListener::bind(&bind_path)?;
137 fs::set_permissions(&bind_path, fs::Permissions::from_mode(0o600))?;
138
139 if bind_path != socket_path {
140 std::os::unix::fs::symlink(&bind_path, socket_path)?;
141 }
142
143 Ok(BoundDaemonSocket {
144 listener,
145 public_path: socket_path.to_path_buf(),
146 bind_path,
147 })
148}
149
150fn cleanup_bound_socket(public_path: &Path, bind_path: &Path) {
151 let _ = remove_stale_socket_path(public_path);
152 if bind_path != public_path {
153 let _ = remove_stale_socket_path(bind_path);
154 }
155}
156
157#[derive(Debug, Clone)]
159pub struct DaemonConfig {
160 pub socket_path: PathBuf,
162 pub max_connections: usize,
164 pub request_timeout: Duration,
166 pub idle_timeout: Duration,
168 pub memory_limit: u64,
170 pub nice_value: i32,
172 pub ionice_class: u32,
174}
175
176impl Default for DaemonConfig {
177 fn default() -> Self {
178 Self {
179 socket_path: default_socket_path(),
180 max_connections: 16,
181 request_timeout: Duration::from_secs(60),
182 idle_timeout: Duration::from_secs(0), memory_limit: 0, nice_value: 10, ionice_class: 2, }
187 }
188}
189
190impl DaemonConfig {
191 pub fn from_env() -> Self {
193 let mut cfg = Self::default();
194
195 if let Ok(path) = dotenvy::var("CASS_DAEMON_SOCKET") {
196 cfg.socket_path = PathBuf::from(path);
197 }
198
199 if let Ok(val) = dotenvy::var("CASS_DAEMON_MAX_CONNECTIONS")
200 && let Ok(n) = val.parse()
201 {
202 cfg.max_connections = n;
203 }
204
205 if let Ok(val) = dotenvy::var("CASS_DAEMON_REQUEST_TIMEOUT_SECS")
206 && let Ok(secs) = val.parse()
207 {
208 cfg.request_timeout = Duration::from_secs(secs);
209 }
210
211 if let Ok(val) = dotenvy::var("CASS_DAEMON_IDLE_TIMEOUT_SECS")
212 && let Ok(secs) = val.parse()
213 {
214 cfg.idle_timeout = Duration::from_secs(secs);
215 }
216
217 if let Ok(val) = dotenvy::var("CASS_DAEMON_MEMORY_LIMIT")
218 && let Ok(bytes) = val.parse()
219 {
220 cfg.memory_limit = bytes;
221 }
222
223 if let Ok(val) = dotenvy::var("CASS_DAEMON_NICE")
224 && let Ok(n) = val.parse()
225 {
226 cfg.nice_value = n;
227 }
228
229 if let Ok(val) = dotenvy::var("CASS_DAEMON_IONICE_CLASS")
230 && let Ok(n) = val.parse()
231 {
232 cfg.ionice_class = n;
233 }
234
235 cfg
236 }
237}
238
239pub struct ModelDaemon {
241 config: DaemonConfig,
242 models: Arc<ModelManager>,
243 resources: ResourceMonitor,
244 start_time: Instant,
245 total_requests: AtomicU64,
246 active_connections: AtomicU64,
247 shutdown: AtomicBool,
248 last_activity: RwLock<Instant>,
249 worker_handle: parking_lot::Mutex<Option<EmbeddingWorkerHandle>>,
250}
251
252impl ModelDaemon {
253 pub fn new(config: DaemonConfig, models: ModelManager) -> Self {
255 Self {
256 config,
257 models: Arc::new(models),
258 resources: ResourceMonitor::new(),
259 start_time: Instant::now(),
260 total_requests: AtomicU64::new(0),
261 active_connections: AtomicU64::new(0),
262 shutdown: AtomicBool::new(false),
263 last_activity: RwLock::new(Instant::now()),
264 worker_handle: parking_lot::Mutex::new(None),
265 }
266 }
267
268 pub fn with_defaults(data_dir: &Path) -> Self {
270 let config = DaemonConfig::from_env();
271 let models = ModelManager::new(data_dir);
272 Self::new(config, models)
273 }
274
275 pub fn uptime_secs(&self) -> u64 {
277 self.start_time.elapsed().as_secs()
278 }
279
280 fn should_shutdown_idle(&self) -> bool {
282 if self.config.idle_timeout.is_zero() {
283 return false;
284 }
285 let last = *self.last_activity.read();
286 last.elapsed() > self.config.idle_timeout
287 }
288
289 fn touch_activity(&self) {
291 *self.last_activity.write() = Instant::now();
292 }
293
294 fn memory_limit_exceeded(&self) -> bool {
296 if self.config.memory_limit == 0 {
297 return false;
298 }
299 let memory_bytes = self.resources.memory_usage();
300 memory_bytes > self.config.memory_limit
301 }
302
303 fn init_worker(&self) {
305 let (worker, handle) = EmbeddingWorker::new();
306 match std::thread::Builder::new()
307 .name("embedding-worker".into())
308 .spawn(move || worker.run())
309 {
310 Ok(_) => {
311 *self.worker_handle.lock() = Some(handle);
312 info!("Embedding worker initialized");
313 }
314 Err(e) => {
315 error!(
316 error = %e,
317 "Failed to spawn embedding worker - background jobs will be unavailable"
318 );
319 }
321 }
322 }
323
324 pub fn run(&self) -> std::io::Result<()> {
326 let lock_path = daemon_run_lock_path(&self.config.socket_path);
328
329 let lock_file = match std::fs::OpenOptions::new()
330 .read(true)
331 .write(true)
332 .create_new(true)
333 .open(&lock_path)
334 {
335 Ok(file) => file,
336 Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {
337 if std::fs::symlink_metadata(&lock_path)?
340 .file_type()
341 .is_symlink()
342 {
343 return Err(std::io::Error::new(
344 std::io::ErrorKind::AlreadyExists,
345 "refusing to open a symlink lock file",
346 ));
347 }
348 std::fs::OpenOptions::new()
349 .read(true)
350 .write(true)
351 .open(&lock_path)?
352 }
353 Err(e) => return Err(e),
354 };
355
356 if lock_file.try_lock_exclusive().is_err() {
358 warn!(
359 socket = %self.config.socket_path.display(),
360 "Another daemon is already running for this socket path"
361 );
362 return Err(std::io::Error::new(
363 std::io::ErrorKind::AddrInUse,
364 "Another daemon is already running",
365 ));
366 }
367
368 if !self.resources.apply_nice(self.config.nice_value) {
370 warn!(
371 nice = self.config.nice_value,
372 "Failed to apply configured daemon nice value"
373 );
374 }
375 if !self.resources.apply_ionice(self.config.ionice_class) {
376 warn!(
377 ionice_class = self.config.ionice_class,
378 "Failed to apply configured daemon ionice class"
379 );
380 }
381
382 let BoundDaemonSocket {
383 listener,
384 public_path,
385 bind_path,
386 } = bind_owner_only_unix_listener(&self.config.socket_path)?;
387 listener.set_nonblocking(true)?;
388
389 info!(
390 socket = %self.config.socket_path.display(),
391 bound_socket = %bind_path.display(),
392 max_connections = self.config.max_connections,
393 "Daemon listening"
394 );
395
396 info!("Pre-warming models...");
398 if let Err(e) = self.models.warm_embedder() {
399 warn!(error = %e, "Failed to pre-warm embedder");
400 }
401 if let Err(e) = self.models.warm_reranker() {
402 warn!(error = %e, "Failed to pre-warm reranker");
403 }
404 info!("Model pre-warming complete");
405
406 self.init_worker();
408
409 std::thread::scope(|s| {
410 loop {
411 if self.shutdown.load(Ordering::SeqCst) {
413 info!("Shutdown requested, stopping daemon");
414 break;
415 }
416
417 if self.should_shutdown_idle() {
419 info!(
420 idle_secs = self.config.idle_timeout.as_secs(),
421 "Idle timeout reached, shutting down"
422 );
423 break;
424 }
425
426 if self.memory_limit_exceeded() {
428 let memory_bytes = self.resources.memory_usage();
429 error!(
430 memory_bytes = memory_bytes,
431 memory_limit = self.config.memory_limit,
432 "Daemon memory limit exceeded, shutting down"
433 );
434 break;
435 }
436
437 match listener.accept() {
439 Ok((stream, _addr)) => {
440 let active = self.active_connections.fetch_add(1, Ordering::SeqCst);
441 if active >= self.config.max_connections as u64 {
442 self.active_connections.fetch_sub(1, Ordering::SeqCst);
443 warn!(
444 active = active,
445 max = self.config.max_connections,
446 "Max connections reached, rejecting"
447 );
448 continue;
449 }
450
451 self.touch_activity();
452 s.spawn(move || {
453 if let Err(e) = self.handle_connection(stream) {
454 debug!(error = %e, "Connection error");
455 }
456 self.active_connections.fetch_sub(1, Ordering::SeqCst);
457 });
458 }
459 Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
460 std::thread::sleep(Duration::from_millis(10));
462 }
463 Err(e) => {
464 error!(error = %e, "Accept error");
465 std::thread::sleep(Duration::from_millis(100));
466 }
467 }
468 }
469 });
470
471 let worker_handle = self.worker_handle.lock().take();
473 if let Some(handle) = worker_handle
474 && let Err(e) = handle.shutdown()
475 {
476 warn!(error = %e, "Failed to send shutdown to embedding worker");
477 }
478
479 cleanup_bound_socket(&public_path, &bind_path);
481
482 info!("Daemon stopped");
483 Ok(())
484 }
485
486 fn read_frame_bytes_with_shutdown(
487 &self,
488 stream: &mut UnixStream,
489 buf: &mut [u8],
490 poll_timeout: Duration,
491 request_timeout: Duration,
492 reset_timeout_on_progress: bool,
493 ) -> std::io::Result<bool> {
494 if buf.is_empty() {
495 return Ok(true);
496 }
497
498 stream.set_read_timeout(Some(poll_timeout))?;
499 let started_at = Instant::now();
500 let mut last_progress_at = started_at;
501 let mut filled = 0usize;
502
503 loop {
504 if self.shutdown.load(Ordering::SeqCst) {
505 debug!("Shutdown requested, closing connection read");
506 return Ok(false);
507 }
508
509 match stream.read(&mut buf[filled..]) {
510 Ok(0) => {
511 debug!("Client disconnected");
512 return Ok(false);
513 }
514 Ok(n) => {
515 filled += n;
516 last_progress_at = Instant::now();
517 if filled == buf.len() {
518 return Ok(true);
519 }
520 }
521 Err(e)
522 if matches!(
523 e.kind(),
524 std::io::ErrorKind::WouldBlock | std::io::ErrorKind::TimedOut
525 ) =>
526 {
527 let timeout_started_at = if reset_timeout_on_progress {
528 last_progress_at
529 } else {
530 started_at
531 };
532 if timeout_started_at.elapsed() >= request_timeout {
533 debug!("Connection timed out");
534 return Ok(false);
535 }
536 }
537 Err(e) if e.kind() == std::io::ErrorKind::Interrupted => continue,
538 Err(e) => return Err(e),
539 }
540 }
541 }
542
543 fn handle_connection(&self, mut stream: UnixStream) -> std::io::Result<()> {
545 const IDLE_SHUTDOWN_POLL: Duration = Duration::from_millis(250);
551 let request_timeout = self.config.request_timeout;
552 let idle_poll = IDLE_SHUTDOWN_POLL.min(request_timeout);
553 stream.set_write_timeout(Some(request_timeout))?;
554
555 loop {
556 let mut len_buf = [0u8; 4];
560 if !self.read_frame_bytes_with_shutdown(
561 &mut stream,
562 &mut len_buf,
563 idle_poll,
564 request_timeout,
565 false,
566 )? {
567 return Ok(());
568 }
569
570 let len = u32::from_be_bytes(len_buf) as usize;
571 if len > 10 * 1024 * 1024 {
572 warn!(
573 len = len,
574 "Request too large (max 10MB), closing connection"
575 );
576 return Ok(());
577 }
578
579 let mut payload = vec![0u8; len];
582 if !self.read_frame_bytes_with_shutdown(
583 &mut stream,
584 &mut payload,
585 idle_poll,
586 request_timeout,
587 true,
588 )? {
589 return Ok(());
590 }
591
592 let response = match decode_message::<Request>(&payload) {
594 Ok(msg) => {
595 self.total_requests.fetch_add(1, Ordering::Relaxed);
596 self.touch_activity();
597 let response = self.handle_request(msg.request_id.clone(), msg.payload);
598 FramedMessage::new(msg.request_id, response)
599 }
600 Err(e) => {
601 warn!(error = %e, "Failed to decode request");
602 FramedMessage::new(
603 "error",
604 Response::Error(ErrorResponse {
605 code: ErrorCode::InvalidInput,
606 message: format!("decode error: {}", e),
607 retryable: false,
608 retry_after_ms: None,
609 }),
610 )
611 }
612 };
613
614 let encoded =
616 encode_message(&response).map_err(|e| std::io::Error::other(e.to_string()))?;
617 stream.write_all(&encoded)?;
618
619 if matches!(response.payload, Response::Shutdown { .. }) {
621 return Ok(());
622 }
623 }
624 }
625
626 fn handle_request(&self, request_id: String, request: Request) -> Response {
628 let start = Instant::now();
629
630 match request {
631 Request::Health => Response::Health(HealthStatus {
632 uptime_secs: self.uptime_secs(),
633 version: PROTOCOL_VERSION,
634 ready: self.models.is_ready(),
635 memory_bytes: self.resources.memory_usage(),
636 }),
637
638 Request::Embed {
639 texts,
640 model,
641 dims: _,
642 } => {
643 debug!(
644 request_id = %request_id,
645 batch_size = texts.len(),
646 model = %model,
647 "Processing embed request"
648 );
649
650 match self.models.embed_batch(&texts) {
651 Ok(embeddings) => Response::Embed(EmbedResponse {
652 embeddings,
653 model: self.models.embedder_id().to_string(),
654 elapsed_ms: start.elapsed().as_millis() as u64,
655 }),
656 Err(e) => Response::Error(ErrorResponse {
657 code: ErrorCode::ModelLoadFailed,
658 message: e.to_string(),
659 retryable: true,
660 retry_after_ms: Some(1000),
661 }),
662 }
663 }
664
665 Request::Rerank {
666 query,
667 documents,
668 model,
669 } => {
670 debug!(
671 request_id = %request_id,
672 doc_count = documents.len(),
673 model = %model,
674 "Processing rerank request"
675 );
676
677 match self.models.rerank(&query, &documents) {
678 Ok(scores) => Response::Rerank(RerankResponse {
679 scores,
680 model: self.models.reranker_id().to_string(),
681 elapsed_ms: start.elapsed().as_millis() as u64,
682 }),
683 Err(e) => Response::Error(ErrorResponse {
684 code: ErrorCode::ModelLoadFailed,
685 message: e.to_string(),
686 retryable: true,
687 retry_after_ms: Some(1000),
688 }),
689 }
690 }
691
692 Request::Status => {
693 let embedder_info = ModelInfo {
694 id: self.models.embedder_id().to_string(),
695 name: self.models.embedder_name().to_string(),
696 dimension: Some(self.models.embedder_dimension()),
697 loaded: self.models.embedder_loaded(),
698 memory_bytes: 0, };
700
701 let reranker_info = ModelInfo {
702 id: self.models.reranker_id().to_string(),
703 name: self.models.reranker_name().to_string(),
704 dimension: None,
705 loaded: self.models.reranker_loaded(),
706 memory_bytes: 0,
707 };
708
709 Response::Status(StatusResponse {
710 uptime_secs: self.uptime_secs(),
711 version: PROTOCOL_VERSION,
712 embedders: vec![embedder_info],
713 rerankers: vec![reranker_info],
714 memory_bytes: self.resources.memory_usage(),
715 total_requests: self.total_requests.load(Ordering::Relaxed),
716 })
717 }
718
719 Request::SubmitEmbeddingJob {
720 db_path,
721 index_path,
722 two_tier,
723 fast_model,
724 quality_model,
725 } => {
726 let config = EmbeddingJobConfig {
727 db_path,
728 index_path,
729 two_tier,
730 fast_model,
731 quality_model,
732 };
733 let worker_handle = self.worker_handle.lock().clone();
734 match worker_handle {
735 Some(handle) => match handle.submit(config) {
736 Ok(()) => Response::JobSubmitted {
737 job_id: request_id.clone(),
738 message: "embedding job submitted".to_string(),
739 },
740 Err(e) => Response::Error(ErrorResponse {
741 code: ErrorCode::Internal,
742 message: format!("failed to submit job: {e}"),
743 retryable: true,
744 retry_after_ms: Some(1000),
745 }),
746 },
747 None => Response::Error(ErrorResponse {
748 code: ErrorCode::Internal,
749 message: "embedding worker not initialized".to_string(),
750 retryable: true,
751 retry_after_ms: Some(1000),
752 }),
753 }
754 }
755
756 Request::EmbeddingJobStatus { db_path } => {
757 match crate::storage::sqlite::FrankenStorage::open(std::path::Path::new(&db_path)) {
758 Ok(storage) => match storage.get_embedding_jobs(&db_path) {
759 Ok(rows) => {
760 let jobs = rows
761 .into_iter()
762 .map(|r| EmbeddingJobDetail {
763 job_id: r.id,
764 model_id: r.model_id,
765 status: r.status,
766 total_docs: r.total_docs,
767 completed_docs: r.completed_docs,
768 error_message: r.error_message,
769 })
770 .collect();
771 Response::JobStatus(EmbeddingJobInfo { jobs })
772 }
773 Err(e) => Response::Error(ErrorResponse {
774 code: ErrorCode::Internal,
775 message: format!("failed to query jobs: {e}"),
776 retryable: false,
777 retry_after_ms: None,
778 }),
779 },
780 Err(e) => Response::Error(ErrorResponse {
781 code: ErrorCode::Internal,
782 message: format!("failed to open database: {e}"),
783 retryable: false,
784 retry_after_ms: None,
785 }),
786 }
787 }
788
789 Request::CancelEmbeddingJob { db_path, model_id } => {
790 let worker_handle = self.worker_handle.lock().clone();
792 if let Some(handle) = worker_handle
793 && let Err(e) = handle.cancel(db_path.clone(), model_id.clone())
794 {
795 warn!(error = %e, "Failed to send cancel to embedding worker");
796 }
797
798 match crate::storage::sqlite::FrankenStorage::open(std::path::Path::new(&db_path)) {
800 Ok(storage) => {
801 match storage.cancel_embedding_jobs(&db_path, model_id.as_deref()) {
802 Ok(count) => Response::JobCancelled {
803 cancelled: count,
804 message: format!("cancelled {count} job(s)"),
805 },
806 Err(e) => Response::Error(ErrorResponse {
807 code: ErrorCode::Internal,
808 message: format!("failed to cancel jobs: {e}"),
809 retryable: false,
810 retry_after_ms: None,
811 }),
812 }
813 }
814 Err(e) => Response::Error(ErrorResponse {
815 code: ErrorCode::Internal,
816 message: format!("failed to open database: {e}"),
817 retryable: false,
818 retry_after_ms: None,
819 }),
820 }
821 }
822
823 Request::Shutdown => {
824 info!(request_id = %request_id, "Shutdown requested");
825 self.shutdown.store(true, Ordering::SeqCst);
826 Response::Shutdown {
827 message: "daemon shutting down".to_string(),
828 }
829 }
830 }
831 }
832
833 pub fn request_shutdown(&self) {
835 self.shutdown.store(true, Ordering::SeqCst);
836 }
837}
838
839#[cfg(test)]
840mod tests {
841 use super::*;
842 use std::path::PathBuf;
843 use tempfile::TempDir;
844
845 fn test_data_dir() -> PathBuf {
846 PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("tests/fixtures")
847 }
848
849 #[test]
850 fn test_config_defaults() {
851 let config = DaemonConfig::default();
852 assert_eq!(config.max_connections, 16);
853 assert_eq!(config.nice_value, 10);
854 assert_eq!(config.ionice_class, 2);
855 }
856
857 #[test]
858 fn test_daemon_uptime() {
859 let config = DaemonConfig::default();
860 let models = ModelManager::new(&test_data_dir());
861 let daemon = ModelDaemon::new(config, models);
862
863 let initial = daemon.uptime_secs();
865 std::thread::sleep(Duration::from_millis(50));
866 let after = daemon.uptime_secs();
867 assert!(after >= initial);
869 }
870
871 #[test]
872 fn test_activity_tracking() {
873 let config = DaemonConfig::default();
874 let models = ModelManager::new(&test_data_dir());
875 let daemon = ModelDaemon::new(config, models);
876
877 let before = *daemon.last_activity.read();
878 std::thread::sleep(Duration::from_millis(10));
879 daemon.touch_activity();
880 let after = *daemon.last_activity.read();
881
882 assert!(after > before);
883 }
884
885 #[test]
886 fn test_shutdown_flag() {
887 let config = DaemonConfig::default();
888 let models = ModelManager::new(&test_data_dir());
889 let daemon = ModelDaemon::new(config, models);
890
891 assert!(!daemon.shutdown.load(Ordering::SeqCst));
892 daemon.request_shutdown();
893 assert!(daemon.shutdown.load(Ordering::SeqCst));
894 }
895
896 #[test]
897 fn test_idle_timeout_disabled_by_default() {
898 let config = DaemonConfig::default();
899 let models = ModelManager::new(&test_data_dir());
900 let daemon = ModelDaemon::new(config, models);
901
902 assert!(!daemon.should_shutdown_idle());
904 }
905
906 #[test]
907 fn test_daemon_run_lock_path_is_stable() {
908 let socket = PathBuf::from("/tmp/cass-semantic.sock");
909 assert_eq!(
910 daemon_run_lock_path(&socket),
911 PathBuf::from("/tmp/cass-semantic.spawnlock")
912 );
913 }
914
915 #[test]
916 fn test_owner_only_bind_uses_private_runtime_dir_for_public_parent() {
917 let temp_dir = TempDir::new().unwrap();
918 let public_dir = temp_dir.path().join("public");
919 fs::create_dir(&public_dir).unwrap();
920 fs::set_permissions(&public_dir, fs::Permissions::from_mode(0o777)).unwrap();
921 let public_socket = public_dir.join("daemon.sock");
922
923 let BoundDaemonSocket {
924 listener,
925 public_path,
926 bind_path,
927 } = bind_owner_only_unix_listener(&public_socket).unwrap();
928
929 assert_eq!(public_path, public_socket);
930 assert_ne!(bind_path, public_socket);
931 assert!(
932 fs::symlink_metadata(&public_socket)
933 .unwrap()
934 .file_type()
935 .is_symlink()
936 );
937
938 let runtime_dir = bind_path.parent().unwrap();
939 assert_eq!(
940 fs::symlink_metadata(runtime_dir)
941 .unwrap()
942 .permissions()
943 .mode()
944 & 0o777,
945 0o700
946 );
947 assert_eq!(
948 fs::symlink_metadata(&bind_path)
949 .unwrap()
950 .permissions()
951 .mode()
952 & 0o777,
953 0o600
954 );
955
956 let accept_thread = std::thread::spawn(move || listener.accept().map(|_| ()));
957 let client = UnixStream::connect(&public_socket).unwrap();
958 drop(client);
959 accept_thread.join().unwrap().unwrap();
960
961 cleanup_bound_socket(&public_path, &bind_path);
962 }
963
964 #[test]
978 fn handle_connection_returns_promptly_when_shutdown_set_during_idle_read() {
979 use std::os::unix::net::UnixStream;
980 use std::sync::Arc;
981 use std::time::Instant;
982
983 let config = DaemonConfig {
988 request_timeout: Duration::from_secs(10),
989 ..Default::default()
990 };
991 let models = ModelManager::new(&test_data_dir());
992 let daemon = Arc::new(ModelDaemon::new(config, models));
993
994 let (server_side, _client_side) = UnixStream::pair().expect("create socketpair");
995
996 let handler_daemon = Arc::clone(&daemon);
1000 let handler_thread =
1001 std::thread::spawn(move || handler_daemon.handle_connection(server_side));
1002
1003 std::thread::sleep(Duration::from_millis(100));
1006
1007 let shutdown_requested_at = Instant::now();
1008 daemon.request_shutdown();
1009
1010 let join_budget = Duration::from_secs(3);
1014 let join_deadline = Instant::now() + join_budget;
1015 let mut joined = false;
1016 while Instant::now() < join_deadline {
1017 if handler_thread.is_finished() {
1018 joined = true;
1019 break;
1020 }
1021 std::thread::sleep(Duration::from_millis(25));
1022 }
1023
1024 assert!(
1025 joined,
1026 "handle_connection must observe shutdown within {join_budget:?}; \
1027 regression suggests the idle read is no longer short-polled"
1028 );
1029 let shutdown_latency = shutdown_requested_at.elapsed();
1030 assert!(
1031 shutdown_latency < Duration::from_secs(2),
1032 "shutdown latency {shutdown_latency:?} is too high; short-poll \
1033 interval is supposed to cap it near IDLE_SHUTDOWN_POLL (~250ms)"
1034 );
1035 let result = handler_thread
1036 .join()
1037 .expect("handle_connection thread panicked");
1038 assert!(
1039 result.is_ok(),
1040 "handler must return Ok on shutdown-during-idle; got {result:?}"
1041 );
1042 }
1043
1044 #[test]
1045 fn handle_connection_returns_promptly_when_shutdown_set_during_partial_payload_read() {
1046 use std::io::Write;
1047 use std::os::unix::net::UnixStream;
1048 use std::sync::Arc;
1049 use std::time::Instant;
1050
1051 let config = DaemonConfig {
1052 request_timeout: Duration::from_secs(10),
1053 ..Default::default()
1054 };
1055 let models = ModelManager::new(&test_data_dir());
1056 let daemon = Arc::new(ModelDaemon::new(config, models));
1057
1058 let (server_side, mut client_side) = UnixStream::pair().expect("create socketpair");
1059 client_side
1060 .write_all(&4u32.to_be_bytes())
1061 .expect("write length prefix only");
1062
1063 let handler_daemon = Arc::clone(&daemon);
1064 let handler_thread =
1065 std::thread::spawn(move || handler_daemon.handle_connection(server_side));
1066
1067 std::thread::sleep(Duration::from_millis(100));
1068
1069 let shutdown_requested_at = Instant::now();
1070 daemon.request_shutdown();
1071
1072 let join_budget = Duration::from_secs(3);
1073 let join_deadline = Instant::now() + join_budget;
1074 let mut joined = false;
1075 while Instant::now() < join_deadline {
1076 if handler_thread.is_finished() {
1077 joined = true;
1078 break;
1079 }
1080 std::thread::sleep(Duration::from_millis(25));
1081 }
1082
1083 assert!(
1084 joined,
1085 "handle_connection must observe shutdown while waiting for a partial payload"
1086 );
1087 let shutdown_latency = shutdown_requested_at.elapsed();
1088 assert!(
1089 shutdown_latency < Duration::from_secs(2),
1090 "partial-payload shutdown latency {shutdown_latency:?} is too high"
1091 );
1092 let result = handler_thread
1093 .join()
1094 .expect("handle_connection thread panicked");
1095 assert!(
1096 result.is_ok(),
1097 "handler must return Ok on shutdown-during-partial-payload; got {result:?}"
1098 );
1099 }
1100}