Skip to main content

coding_agent_search/daemon/
core.rs

1//! Daemon server core for the semantic model daemon.
2//!
3//! This module provides the server that listens on a Unix Domain Socket
4//! and handles embedding/reranking requests using loaded models.
5
6use std::ffi::OsString;
7use std::fs::{self, DirBuilder};
8use std::io::{Read, Write};
9use std::os::unix::fs::{DirBuilderExt, FileTypeExt, PermissionsExt};
10use std::os::unix::net::{UnixListener, UnixStream};
11use std::path::{Path, PathBuf};
12use std::sync::Arc;
13use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
14use std::time::{Duration, Instant};
15
16use fs2::FileExt;
17use parking_lot::RwLock;
18use tracing::{debug, error, info, warn};
19
20use super::daemon_run_lock_path;
21use super::models::ModelManager;
22use super::protocol::{
23    EmbedResponse, EmbeddingJobDetail, EmbeddingJobInfo, ErrorCode, ErrorResponse, FramedMessage,
24    HealthStatus, ModelInfo, PROTOCOL_VERSION, Request, RerankResponse, Response, StatusResponse,
25    decode_message, default_socket_path, encode_message,
26};
27use super::resource::ResourceMonitor;
28use super::worker::{EmbeddingJobConfig, EmbeddingWorker, EmbeddingWorkerHandle};
29
30struct BoundDaemonSocket {
31    listener: UnixListener,
32    public_path: PathBuf,
33    bind_path: PathBuf,
34}
35
36fn create_owner_only_dir_all(path: &Path) -> std::io::Result<()> {
37    let mut builder = DirBuilder::new();
38    builder.recursive(true);
39    builder.mode(0o700);
40    builder.create(path)?;
41
42    // MUST verify the path is a real directory and not a symlink.
43    // This prevents symlink attacks in shared parents (e.g. /tmp) where
44    // an attacker creates a symlink that DirBuilder happily traverses.
45    let meta = fs::symlink_metadata(path)?;
46    if !meta.file_type().is_dir() {
47        return Err(std::io::Error::new(
48            std::io::ErrorKind::AlreadyExists,
49            format!(
50                "path exists but is not a regular directory: {}",
51                path.display()
52            ),
53        ));
54    }
55
56    // Only apply chmod if permissions are too loose. This minimizes the TOCTOU window
57    // since newly created directories will already have correct permissions.
58    if meta.permissions().mode() & 0o777 != 0o700 {
59        fs::set_permissions(path, fs::Permissions::from_mode(0o700))?;
60    }
61    Ok(())
62}
63
64fn parent_dir_is_owner_only(path: &Path) -> std::io::Result<bool> {
65    let Some(parent) = path.parent() else {
66        return Ok(false);
67    };
68
69    let metadata = fs::symlink_metadata(parent)?;
70    if !metadata.file_type().is_dir() {
71        return Err(std::io::Error::new(
72            std::io::ErrorKind::InvalidInput,
73            format!("socket parent is not a directory: {}", parent.display()),
74        ));
75    }
76
77    Ok(metadata.permissions().mode() & 0o077 == 0)
78}
79
80fn private_runtime_dir_for_socket(socket_path: &Path) -> std::io::Result<PathBuf> {
81    let parent = socket_path.parent().unwrap_or_else(|| Path::new("."));
82    let file_name = socket_path.file_name().ok_or_else(|| {
83        std::io::Error::new(
84            std::io::ErrorKind::InvalidInput,
85            format!("socket path has no file name: {}", socket_path.display()),
86        )
87    })?;
88
89    let mut runtime_name = OsString::from(".");
90    runtime_name.push(file_name);
91    runtime_name.push(".runtime");
92    Ok(parent.join(runtime_name))
93}
94
95fn remove_stale_socket_path(path: &Path) -> std::io::Result<()> {
96    match fs::symlink_metadata(path) {
97        Ok(metadata) => {
98            let file_type = metadata.file_type();
99            if file_type.is_socket() || file_type.is_symlink() {
100                fs::remove_file(path)
101            } else {
102                Err(std::io::Error::new(
103                    std::io::ErrorKind::AlreadyExists,
104                    format!(
105                        "refusing to remove non-socket daemon path: {}",
106                        path.display()
107                    ),
108                ))
109            }
110        }
111        Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()),
112        Err(e) => Err(e),
113    }
114}
115
116fn bind_owner_only_unix_listener(socket_path: &Path) -> std::io::Result<BoundDaemonSocket> {
117    if let Some(parent) = socket_path.parent()
118        && !parent.exists()
119    {
120        create_owner_only_dir_all(parent)?;
121    }
122
123    let bind_path = if parent_dir_is_owner_only(socket_path)? {
124        socket_path.to_path_buf()
125    } else {
126        let runtime_dir = private_runtime_dir_for_socket(socket_path)?;
127        create_owner_only_dir_all(&runtime_dir)?;
128        runtime_dir.join("daemon.sock")
129    };
130
131    remove_stale_socket_path(&bind_path)?;
132    if bind_path != socket_path {
133        remove_stale_socket_path(socket_path)?;
134    }
135
136    let listener = UnixListener::bind(&bind_path)?;
137    fs::set_permissions(&bind_path, fs::Permissions::from_mode(0o600))?;
138
139    if bind_path != socket_path {
140        std::os::unix::fs::symlink(&bind_path, socket_path)?;
141    }
142
143    Ok(BoundDaemonSocket {
144        listener,
145        public_path: socket_path.to_path_buf(),
146        bind_path,
147    })
148}
149
150fn cleanup_bound_socket(public_path: &Path, bind_path: &Path) {
151    let _ = remove_stale_socket_path(public_path);
152    if bind_path != public_path {
153        let _ = remove_stale_socket_path(bind_path);
154    }
155}
156
157/// Configuration for the daemon server.
158#[derive(Debug, Clone)]
159pub struct DaemonConfig {
160    /// Path to the Unix socket.
161    pub socket_path: PathBuf,
162    /// Maximum concurrent connections.
163    pub max_connections: usize,
164    /// Request timeout.
165    pub request_timeout: Duration,
166    /// Idle shutdown timeout (0 = never shutdown).
167    pub idle_timeout: Duration,
168    /// Memory limit in bytes (0 = unlimited).
169    pub memory_limit: u64,
170    /// Nice value for process priority (-20 to 19).
171    pub nice_value: i32,
172    /// IO priority class (0-3).
173    pub ionice_class: u32,
174}
175
176impl Default for DaemonConfig {
177    fn default() -> Self {
178        Self {
179            socket_path: default_socket_path(),
180            max_connections: 16,
181            request_timeout: Duration::from_secs(60),
182            idle_timeout: Duration::from_secs(0), // Never shutdown by default
183            memory_limit: 0,                      // Unlimited
184            nice_value: 10,                       // Low priority
185            ionice_class: 2,                      // Best-effort
186        }
187    }
188}
189
190impl DaemonConfig {
191    /// Load config from environment variables.
192    pub fn from_env() -> Self {
193        let mut cfg = Self::default();
194
195        if let Ok(path) = dotenvy::var("CASS_DAEMON_SOCKET") {
196            cfg.socket_path = PathBuf::from(path);
197        }
198
199        if let Ok(val) = dotenvy::var("CASS_DAEMON_MAX_CONNECTIONS")
200            && let Ok(n) = val.parse()
201        {
202            cfg.max_connections = n;
203        }
204
205        if let Ok(val) = dotenvy::var("CASS_DAEMON_REQUEST_TIMEOUT_SECS")
206            && let Ok(secs) = val.parse()
207        {
208            cfg.request_timeout = Duration::from_secs(secs);
209        }
210
211        if let Ok(val) = dotenvy::var("CASS_DAEMON_IDLE_TIMEOUT_SECS")
212            && let Ok(secs) = val.parse()
213        {
214            cfg.idle_timeout = Duration::from_secs(secs);
215        }
216
217        if let Ok(val) = dotenvy::var("CASS_DAEMON_MEMORY_LIMIT")
218            && let Ok(bytes) = val.parse()
219        {
220            cfg.memory_limit = bytes;
221        }
222
223        if let Ok(val) = dotenvy::var("CASS_DAEMON_NICE")
224            && let Ok(n) = val.parse()
225        {
226            cfg.nice_value = n;
227        }
228
229        if let Ok(val) = dotenvy::var("CASS_DAEMON_IONICE_CLASS")
230            && let Ok(n) = val.parse()
231        {
232            cfg.ionice_class = n;
233        }
234
235        cfg
236    }
237}
238
239/// Daemon server state.
240pub struct ModelDaemon {
241    config: DaemonConfig,
242    models: Arc<ModelManager>,
243    resources: ResourceMonitor,
244    start_time: Instant,
245    total_requests: AtomicU64,
246    active_connections: AtomicU64,
247    shutdown: AtomicBool,
248    last_activity: RwLock<Instant>,
249    worker_handle: parking_lot::Mutex<Option<EmbeddingWorkerHandle>>,
250}
251
252impl ModelDaemon {
253    /// Create a new daemon with the given configuration.
254    pub fn new(config: DaemonConfig, models: ModelManager) -> Self {
255        Self {
256            config,
257            models: Arc::new(models),
258            resources: ResourceMonitor::new(),
259            start_time: Instant::now(),
260            total_requests: AtomicU64::new(0),
261            active_connections: AtomicU64::new(0),
262            shutdown: AtomicBool::new(false),
263            last_activity: RwLock::new(Instant::now()),
264            worker_handle: parking_lot::Mutex::new(None),
265        }
266    }
267
268    /// Create daemon with default config and models from data directory.
269    pub fn with_defaults(data_dir: &Path) -> Self {
270        let config = DaemonConfig::from_env();
271        let models = ModelManager::new(data_dir);
272        Self::new(config, models)
273    }
274
275    /// Get current uptime in seconds.
276    pub fn uptime_secs(&self) -> u64 {
277        self.start_time.elapsed().as_secs()
278    }
279
280    /// Check if daemon should shutdown due to idle timeout.
281    fn should_shutdown_idle(&self) -> bool {
282        if self.config.idle_timeout.is_zero() {
283            return false;
284        }
285        let last = *self.last_activity.read();
286        last.elapsed() > self.config.idle_timeout
287    }
288
289    /// Update last activity timestamp.
290    fn touch_activity(&self) {
291        *self.last_activity.write() = Instant::now();
292    }
293
294    /// Check whether configured memory limit is exceeded.
295    fn memory_limit_exceeded(&self) -> bool {
296        if self.config.memory_limit == 0 {
297            return false;
298        }
299        let memory_bytes = self.resources.memory_usage();
300        memory_bytes > self.config.memory_limit
301    }
302
303    /// Initialize the background embedding worker thread.
304    fn init_worker(&self) {
305        let (worker, handle) = EmbeddingWorker::new();
306        match std::thread::Builder::new()
307            .name("embedding-worker".into())
308            .spawn(move || worker.run())
309        {
310            Ok(_) => {
311                *self.worker_handle.lock() = Some(handle);
312                info!("Embedding worker initialized");
313            }
314            Err(e) => {
315                error!(
316                    error = %e,
317                    "Failed to spawn embedding worker - background jobs will be unavailable"
318                );
319                // Continue without worker - daemon can still handle other requests
320            }
321        }
322    }
323
324    /// Start the daemon server.
325    pub fn run(&self) -> std::io::Result<()> {
326        // Use a file lock to ensure only one daemon instance runs for this socket path
327        let lock_path = daemon_run_lock_path(&self.config.socket_path);
328
329        let lock_file = match std::fs::OpenOptions::new()
330            .read(true)
331            .write(true)
332            .create_new(true)
333            .open(&lock_path)
334        {
335            Ok(file) => file,
336            Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {
337                // Prevent symlink attacks by refusing to open symlinks.
338                // TOCTOU window exists here but is significantly reduced.
339                if std::fs::symlink_metadata(&lock_path)?
340                    .file_type()
341                    .is_symlink()
342                {
343                    return Err(std::io::Error::new(
344                        std::io::ErrorKind::AlreadyExists,
345                        "refusing to open a symlink lock file",
346                    ));
347                }
348                std::fs::OpenOptions::new()
349                    .read(true)
350                    .write(true)
351                    .open(&lock_path)?
352            }
353            Err(e) => return Err(e),
354        };
355
356        // Acquire exclusive lock (non-blocking to fail fast if another daemon is already running)
357        if lock_file.try_lock_exclusive().is_err() {
358            warn!(
359                socket = %self.config.socket_path.display(),
360                "Another daemon is already running for this socket path"
361            );
362            return Err(std::io::Error::new(
363                std::io::ErrorKind::AddrInUse,
364                "Another daemon is already running",
365            ));
366        }
367
368        // Apply resource limits
369        if !self.resources.apply_nice(self.config.nice_value) {
370            warn!(
371                nice = self.config.nice_value,
372                "Failed to apply configured daemon nice value"
373            );
374        }
375        if !self.resources.apply_ionice(self.config.ionice_class) {
376            warn!(
377                ionice_class = self.config.ionice_class,
378                "Failed to apply configured daemon ionice class"
379            );
380        }
381
382        let BoundDaemonSocket {
383            listener,
384            public_path,
385            bind_path,
386        } = bind_owner_only_unix_listener(&self.config.socket_path)?;
387        listener.set_nonblocking(true)?;
388
389        info!(
390            socket = %self.config.socket_path.display(),
391            bound_socket = %bind_path.display(),
392            max_connections = self.config.max_connections,
393            "Daemon listening"
394        );
395
396        // Pre-warm models if available
397        info!("Pre-warming models...");
398        if let Err(e) = self.models.warm_embedder() {
399            warn!(error = %e, "Failed to pre-warm embedder");
400        }
401        if let Err(e) = self.models.warm_reranker() {
402            warn!(error = %e, "Failed to pre-warm reranker");
403        }
404        info!("Model pre-warming complete");
405
406        // Start background embedding worker
407        self.init_worker();
408
409        std::thread::scope(|s| {
410            loop {
411                // Check for shutdown
412                if self.shutdown.load(Ordering::SeqCst) {
413                    info!("Shutdown requested, stopping daemon");
414                    break;
415                }
416
417                // Check for idle shutdown
418                if self.should_shutdown_idle() {
419                    info!(
420                        idle_secs = self.config.idle_timeout.as_secs(),
421                        "Idle timeout reached, shutting down"
422                    );
423                    break;
424                }
425
426                // Enforce configured memory limit when enabled.
427                if self.memory_limit_exceeded() {
428                    let memory_bytes = self.resources.memory_usage();
429                    error!(
430                        memory_bytes = memory_bytes,
431                        memory_limit = self.config.memory_limit,
432                        "Daemon memory limit exceeded, shutting down"
433                    );
434                    break;
435                }
436
437                // Accept new connections
438                match listener.accept() {
439                    Ok((stream, _addr)) => {
440                        let active = self.active_connections.fetch_add(1, Ordering::SeqCst);
441                        if active >= self.config.max_connections as u64 {
442                            self.active_connections.fetch_sub(1, Ordering::SeqCst);
443                            warn!(
444                                active = active,
445                                max = self.config.max_connections,
446                                "Max connections reached, rejecting"
447                            );
448                            continue;
449                        }
450
451                        self.touch_activity();
452                        s.spawn(move || {
453                            if let Err(e) = self.handle_connection(stream) {
454                                debug!(error = %e, "Connection error");
455                            }
456                            self.active_connections.fetch_sub(1, Ordering::SeqCst);
457                        });
458                    }
459                    Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
460                        // No pending connections, sleep briefly
461                        std::thread::sleep(Duration::from_millis(10));
462                    }
463                    Err(e) => {
464                        error!(error = %e, "Accept error");
465                        std::thread::sleep(Duration::from_millis(100));
466                    }
467                }
468            }
469        });
470
471        // Shutdown embedding worker
472        let worker_handle = self.worker_handle.lock().take();
473        if let Some(handle) = worker_handle
474            && let Err(e) = handle.shutdown()
475        {
476            warn!(error = %e, "Failed to send shutdown to embedding worker");
477        }
478
479        // Cleanup
480        cleanup_bound_socket(&public_path, &bind_path);
481
482        info!("Daemon stopped");
483        Ok(())
484    }
485
486    fn read_frame_bytes_with_shutdown(
487        &self,
488        stream: &mut UnixStream,
489        buf: &mut [u8],
490        poll_timeout: Duration,
491        request_timeout: Duration,
492        reset_timeout_on_progress: bool,
493    ) -> std::io::Result<bool> {
494        if buf.is_empty() {
495            return Ok(true);
496        }
497
498        stream.set_read_timeout(Some(poll_timeout))?;
499        let started_at = Instant::now();
500        let mut last_progress_at = started_at;
501        let mut filled = 0usize;
502
503        loop {
504            if self.shutdown.load(Ordering::SeqCst) {
505                debug!("Shutdown requested, closing connection read");
506                return Ok(false);
507            }
508
509            match stream.read(&mut buf[filled..]) {
510                Ok(0) => {
511                    debug!("Client disconnected");
512                    return Ok(false);
513                }
514                Ok(n) => {
515                    filled += n;
516                    last_progress_at = Instant::now();
517                    if filled == buf.len() {
518                        return Ok(true);
519                    }
520                }
521                Err(e)
522                    if matches!(
523                        e.kind(),
524                        std::io::ErrorKind::WouldBlock | std::io::ErrorKind::TimedOut
525                    ) =>
526                {
527                    let timeout_started_at = if reset_timeout_on_progress {
528                        last_progress_at
529                    } else {
530                        started_at
531                    };
532                    if timeout_started_at.elapsed() >= request_timeout {
533                        debug!("Connection timed out");
534                        return Ok(false);
535                    }
536                }
537                Err(e) if e.kind() == std::io::ErrorKind::Interrupted => continue,
538                Err(e) => return Err(e),
539            }
540        }
541    }
542
543    /// Handle a single client connection.
544    fn handle_connection(&self, mut stream: UnixStream) -> std::io::Result<()> {
545        // Bounded idle-poll interval so `std::thread::scope` shutdown does
546        // not stall behind a client that opened the socket and never sent
547        // bytes. The configured `request_timeout` still bounds the total
548        // idle wait; this just breaks the single long blocking read into
549        // short chunks and checks `self.shutdown` between them.
550        const IDLE_SHUTDOWN_POLL: Duration = Duration::from_millis(250);
551        let request_timeout = self.config.request_timeout;
552        let idle_poll = IDLE_SHUTDOWN_POLL.min(request_timeout);
553        stream.set_write_timeout(Some(request_timeout))?;
554
555        loop {
556            // Idle read (length prefix): short-poll so shutdown cancels
557            // promptly. Track `filled` manually because `read_exact`
558            // discards partial bytes on timeout.
559            let mut len_buf = [0u8; 4];
560            if !self.read_frame_bytes_with_shutdown(
561                &mut stream,
562                &mut len_buf,
563                idle_poll,
564                request_timeout,
565                false,
566            )? {
567                return Ok(());
568            }
569
570            let len = u32::from_be_bytes(len_buf) as usize;
571            if len > 10 * 1024 * 1024 {
572                warn!(
573                    len = len,
574                    "Request too large (max 10MB), closing connection"
575                );
576                return Ok(());
577            }
578
579            // Payload read: bytes are in flight, so keep the timeout as an
580            // idle-progress budget while still short-polling shutdown.
581            let mut payload = vec![0u8; len];
582            if !self.read_frame_bytes_with_shutdown(
583                &mut stream,
584                &mut payload,
585                idle_poll,
586                request_timeout,
587                true,
588            )? {
589                return Ok(());
590            }
591
592            // Decode and handle request
593            let response = match decode_message::<Request>(&payload) {
594                Ok(msg) => {
595                    self.total_requests.fetch_add(1, Ordering::Relaxed);
596                    self.touch_activity();
597                    let response = self.handle_request(msg.request_id.clone(), msg.payload);
598                    FramedMessage::new(msg.request_id, response)
599                }
600                Err(e) => {
601                    warn!(error = %e, "Failed to decode request");
602                    FramedMessage::new(
603                        "error",
604                        Response::Error(ErrorResponse {
605                            code: ErrorCode::InvalidInput,
606                            message: format!("decode error: {}", e),
607                            retryable: false,
608                            retry_after_ms: None,
609                        }),
610                    )
611                }
612            };
613
614            // Send response
615            let encoded =
616                encode_message(&response).map_err(|e| std::io::Error::other(e.to_string()))?;
617            stream.write_all(&encoded)?;
618
619            // Check if this was a shutdown request
620            if matches!(response.payload, Response::Shutdown { .. }) {
621                return Ok(());
622            }
623        }
624    }
625
626    /// Handle a single request.
627    fn handle_request(&self, request_id: String, request: Request) -> Response {
628        let start = Instant::now();
629
630        match request {
631            Request::Health => Response::Health(HealthStatus {
632                uptime_secs: self.uptime_secs(),
633                version: PROTOCOL_VERSION,
634                ready: self.models.is_ready(),
635                memory_bytes: self.resources.memory_usage(),
636            }),
637
638            Request::Embed {
639                texts,
640                model,
641                dims: _,
642            } => {
643                debug!(
644                    request_id = %request_id,
645                    batch_size = texts.len(),
646                    model = %model,
647                    "Processing embed request"
648                );
649
650                match self.models.embed_batch(&texts) {
651                    Ok(embeddings) => Response::Embed(EmbedResponse {
652                        embeddings,
653                        model: self.models.embedder_id().to_string(),
654                        elapsed_ms: start.elapsed().as_millis() as u64,
655                    }),
656                    Err(e) => Response::Error(ErrorResponse {
657                        code: ErrorCode::ModelLoadFailed,
658                        message: e.to_string(),
659                        retryable: true,
660                        retry_after_ms: Some(1000),
661                    }),
662                }
663            }
664
665            Request::Rerank {
666                query,
667                documents,
668                model,
669            } => {
670                debug!(
671                    request_id = %request_id,
672                    doc_count = documents.len(),
673                    model = %model,
674                    "Processing rerank request"
675                );
676
677                match self.models.rerank(&query, &documents) {
678                    Ok(scores) => Response::Rerank(RerankResponse {
679                        scores,
680                        model: self.models.reranker_id().to_string(),
681                        elapsed_ms: start.elapsed().as_millis() as u64,
682                    }),
683                    Err(e) => Response::Error(ErrorResponse {
684                        code: ErrorCode::ModelLoadFailed,
685                        message: e.to_string(),
686                        retryable: true,
687                        retry_after_ms: Some(1000),
688                    }),
689                }
690            }
691
692            Request::Status => {
693                let embedder_info = ModelInfo {
694                    id: self.models.embedder_id().to_string(),
695                    name: self.models.embedder_name().to_string(),
696                    dimension: Some(self.models.embedder_dimension()),
697                    loaded: self.models.embedder_loaded(),
698                    memory_bytes: 0, // Would need model-specific tracking
699                };
700
701                let reranker_info = ModelInfo {
702                    id: self.models.reranker_id().to_string(),
703                    name: self.models.reranker_name().to_string(),
704                    dimension: None,
705                    loaded: self.models.reranker_loaded(),
706                    memory_bytes: 0,
707                };
708
709                Response::Status(StatusResponse {
710                    uptime_secs: self.uptime_secs(),
711                    version: PROTOCOL_VERSION,
712                    embedders: vec![embedder_info],
713                    rerankers: vec![reranker_info],
714                    memory_bytes: self.resources.memory_usage(),
715                    total_requests: self.total_requests.load(Ordering::Relaxed),
716                })
717            }
718
719            Request::SubmitEmbeddingJob {
720                db_path,
721                index_path,
722                two_tier,
723                fast_model,
724                quality_model,
725            } => {
726                let config = EmbeddingJobConfig {
727                    db_path,
728                    index_path,
729                    two_tier,
730                    fast_model,
731                    quality_model,
732                };
733                let worker_handle = self.worker_handle.lock().clone();
734                match worker_handle {
735                    Some(handle) => match handle.submit(config) {
736                        Ok(()) => Response::JobSubmitted {
737                            job_id: request_id.clone(),
738                            message: "embedding job submitted".to_string(),
739                        },
740                        Err(e) => Response::Error(ErrorResponse {
741                            code: ErrorCode::Internal,
742                            message: format!("failed to submit job: {e}"),
743                            retryable: true,
744                            retry_after_ms: Some(1000),
745                        }),
746                    },
747                    None => Response::Error(ErrorResponse {
748                        code: ErrorCode::Internal,
749                        message: "embedding worker not initialized".to_string(),
750                        retryable: true,
751                        retry_after_ms: Some(1000),
752                    }),
753                }
754            }
755
756            Request::EmbeddingJobStatus { db_path } => {
757                match crate::storage::sqlite::FrankenStorage::open(std::path::Path::new(&db_path)) {
758                    Ok(storage) => match storage.get_embedding_jobs(&db_path) {
759                        Ok(rows) => {
760                            let jobs = rows
761                                .into_iter()
762                                .map(|r| EmbeddingJobDetail {
763                                    job_id: r.id,
764                                    model_id: r.model_id,
765                                    status: r.status,
766                                    total_docs: r.total_docs,
767                                    completed_docs: r.completed_docs,
768                                    error_message: r.error_message,
769                                })
770                                .collect();
771                            Response::JobStatus(EmbeddingJobInfo { jobs })
772                        }
773                        Err(e) => Response::Error(ErrorResponse {
774                            code: ErrorCode::Internal,
775                            message: format!("failed to query jobs: {e}"),
776                            retryable: false,
777                            retry_after_ms: None,
778                        }),
779                    },
780                    Err(e) => Response::Error(ErrorResponse {
781                        code: ErrorCode::Internal,
782                        message: format!("failed to open database: {e}"),
783                        retryable: false,
784                        retry_after_ms: None,
785                    }),
786                }
787            }
788
789            Request::CancelEmbeddingJob { db_path, model_id } => {
790                // Send cancel to worker
791                let worker_handle = self.worker_handle.lock().clone();
792                if let Some(handle) = worker_handle
793                    && let Err(e) = handle.cancel(db_path.clone(), model_id.clone())
794                {
795                    warn!(error = %e, "Failed to send cancel to embedding worker");
796                }
797
798                // Also cancel in database
799                match crate::storage::sqlite::FrankenStorage::open(std::path::Path::new(&db_path)) {
800                    Ok(storage) => {
801                        match storage.cancel_embedding_jobs(&db_path, model_id.as_deref()) {
802                            Ok(count) => Response::JobCancelled {
803                                cancelled: count,
804                                message: format!("cancelled {count} job(s)"),
805                            },
806                            Err(e) => Response::Error(ErrorResponse {
807                                code: ErrorCode::Internal,
808                                message: format!("failed to cancel jobs: {e}"),
809                                retryable: false,
810                                retry_after_ms: None,
811                            }),
812                        }
813                    }
814                    Err(e) => Response::Error(ErrorResponse {
815                        code: ErrorCode::Internal,
816                        message: format!("failed to open database: {e}"),
817                        retryable: false,
818                        retry_after_ms: None,
819                    }),
820                }
821            }
822
823            Request::Shutdown => {
824                info!(request_id = %request_id, "Shutdown requested");
825                self.shutdown.store(true, Ordering::SeqCst);
826                Response::Shutdown {
827                    message: "daemon shutting down".to_string(),
828                }
829            }
830        }
831    }
832
833    /// Request the daemon to shutdown.
834    pub fn request_shutdown(&self) {
835        self.shutdown.store(true, Ordering::SeqCst);
836    }
837}
838
839#[cfg(test)]
840mod tests {
841    use super::*;
842    use std::path::PathBuf;
843    use tempfile::TempDir;
844
845    fn test_data_dir() -> PathBuf {
846        PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("tests/fixtures")
847    }
848
849    #[test]
850    fn test_config_defaults() {
851        let config = DaemonConfig::default();
852        assert_eq!(config.max_connections, 16);
853        assert_eq!(config.nice_value, 10);
854        assert_eq!(config.ionice_class, 2);
855    }
856
857    #[test]
858    fn test_daemon_uptime() {
859        let config = DaemonConfig::default();
860        let models = ModelManager::new(&test_data_dir());
861        let daemon = ModelDaemon::new(config, models);
862
863        // Uptime should be 0 or 1 second initially
864        let initial = daemon.uptime_secs();
865        std::thread::sleep(Duration::from_millis(50));
866        let after = daemon.uptime_secs();
867        // Uptime should not decrease
868        assert!(after >= initial);
869    }
870
871    #[test]
872    fn test_activity_tracking() {
873        let config = DaemonConfig::default();
874        let models = ModelManager::new(&test_data_dir());
875        let daemon = ModelDaemon::new(config, models);
876
877        let before = *daemon.last_activity.read();
878        std::thread::sleep(Duration::from_millis(10));
879        daemon.touch_activity();
880        let after = *daemon.last_activity.read();
881
882        assert!(after > before);
883    }
884
885    #[test]
886    fn test_shutdown_flag() {
887        let config = DaemonConfig::default();
888        let models = ModelManager::new(&test_data_dir());
889        let daemon = ModelDaemon::new(config, models);
890
891        assert!(!daemon.shutdown.load(Ordering::SeqCst));
892        daemon.request_shutdown();
893        assert!(daemon.shutdown.load(Ordering::SeqCst));
894    }
895
896    #[test]
897    fn test_idle_timeout_disabled_by_default() {
898        let config = DaemonConfig::default();
899        let models = ModelManager::new(&test_data_dir());
900        let daemon = ModelDaemon::new(config, models);
901
902        // With idle_timeout = 0, should never trigger idle shutdown
903        assert!(!daemon.should_shutdown_idle());
904    }
905
906    #[test]
907    fn test_daemon_run_lock_path_is_stable() {
908        let socket = PathBuf::from("/tmp/cass-semantic.sock");
909        assert_eq!(
910            daemon_run_lock_path(&socket),
911            PathBuf::from("/tmp/cass-semantic.spawnlock")
912        );
913    }
914
915    #[test]
916    fn test_owner_only_bind_uses_private_runtime_dir_for_public_parent() {
917        let temp_dir = TempDir::new().unwrap();
918        let public_dir = temp_dir.path().join("public");
919        fs::create_dir(&public_dir).unwrap();
920        fs::set_permissions(&public_dir, fs::Permissions::from_mode(0o777)).unwrap();
921        let public_socket = public_dir.join("daemon.sock");
922
923        let BoundDaemonSocket {
924            listener,
925            public_path,
926            bind_path,
927        } = bind_owner_only_unix_listener(&public_socket).unwrap();
928
929        assert_eq!(public_path, public_socket);
930        assert_ne!(bind_path, public_socket);
931        assert!(
932            fs::symlink_metadata(&public_socket)
933                .unwrap()
934                .file_type()
935                .is_symlink()
936        );
937
938        let runtime_dir = bind_path.parent().unwrap();
939        assert_eq!(
940            fs::symlink_metadata(runtime_dir)
941                .unwrap()
942                .permissions()
943                .mode()
944                & 0o777,
945            0o700
946        );
947        assert_eq!(
948            fs::symlink_metadata(&bind_path)
949                .unwrap()
950                .permissions()
951                .mode()
952                & 0o777,
953            0o600
954        );
955
956        let accept_thread = std::thread::spawn(move || listener.accept().map(|_| ()));
957        let client = UnixStream::connect(&public_socket).unwrap();
958        drop(client);
959        accept_thread.join().unwrap().unwrap();
960
961        cleanup_bound_socket(&public_path, &bind_path);
962    }
963
964    /// `coding_agent_session_search-a5z57`: before the short-poll fix,
965    /// an idle client holding a connection open without sending bytes
966    /// would pin `handle_connection` inside `read_exact` for the full
967    /// `request_timeout` — 60s in the default config. Because the
968    /// connection handlers run inside `std::thread::scope` in
969    /// `ModelDaemon::run`, shutdown could not complete until every
970    /// such handler bled out its idle read, so a single idle peer
971    /// made `systemctl stop` / SIGTERM feel like a 60-second hang.
972    ///
973    /// This test pins the fix contract: with `request_timeout` set to
974    /// a value much larger than the handler's effective shutdown
975    /// latency, setting `self.shutdown` must cause an idle handler to
976    /// return promptly (well under the configured timeout).
977    #[test]
978    fn handle_connection_returns_promptly_when_shutdown_set_during_idle_read() {
979        use std::os::unix::net::UnixStream;
980        use std::sync::Arc;
981        use std::time::Instant;
982
983        // 10s request_timeout is plenty big to catch a regression: if
984        // the handler falls back to the old single-blocking-read path,
985        // shutdown latency would be ~10s, not the sub-second target
986        // asserted below.
987        let config = DaemonConfig {
988            request_timeout: Duration::from_secs(10),
989            ..Default::default()
990        };
991        let models = ModelManager::new(&test_data_dir());
992        let daemon = Arc::new(ModelDaemon::new(config, models));
993
994        let (server_side, _client_side) = UnixStream::pair().expect("create socketpair");
995
996        // Drive handle_connection on the server side in a worker thread;
997        // client side stays open but sends nothing, emulating the idle
998        // peer that used to block shutdown.
999        let handler_daemon = Arc::clone(&daemon);
1000        let handler_thread =
1001            std::thread::spawn(move || handler_daemon.handle_connection(server_side));
1002
1003        // Let the handler settle into its idle read loop before
1004        // requesting shutdown (the first read poll arms at 250ms).
1005        std::thread::sleep(Duration::from_millis(100));
1006
1007        let shutdown_requested_at = Instant::now();
1008        daemon.request_shutdown();
1009
1010        // Join with a generous safety bound that is still well below
1011        // the 10s request_timeout — a regression to the old behavior
1012        // would exceed this.
1013        let join_budget = Duration::from_secs(3);
1014        let join_deadline = Instant::now() + join_budget;
1015        let mut joined = false;
1016        while Instant::now() < join_deadline {
1017            if handler_thread.is_finished() {
1018                joined = true;
1019                break;
1020            }
1021            std::thread::sleep(Duration::from_millis(25));
1022        }
1023
1024        assert!(
1025            joined,
1026            "handle_connection must observe shutdown within {join_budget:?}; \
1027             regression suggests the idle read is no longer short-polled"
1028        );
1029        let shutdown_latency = shutdown_requested_at.elapsed();
1030        assert!(
1031            shutdown_latency < Duration::from_secs(2),
1032            "shutdown latency {shutdown_latency:?} is too high; short-poll \
1033             interval is supposed to cap it near IDLE_SHUTDOWN_POLL (~250ms)"
1034        );
1035        let result = handler_thread
1036            .join()
1037            .expect("handle_connection thread panicked");
1038        assert!(
1039            result.is_ok(),
1040            "handler must return Ok on shutdown-during-idle; got {result:?}"
1041        );
1042    }
1043
1044    #[test]
1045    fn handle_connection_returns_promptly_when_shutdown_set_during_partial_payload_read() {
1046        use std::io::Write;
1047        use std::os::unix::net::UnixStream;
1048        use std::sync::Arc;
1049        use std::time::Instant;
1050
1051        let config = DaemonConfig {
1052            request_timeout: Duration::from_secs(10),
1053            ..Default::default()
1054        };
1055        let models = ModelManager::new(&test_data_dir());
1056        let daemon = Arc::new(ModelDaemon::new(config, models));
1057
1058        let (server_side, mut client_side) = UnixStream::pair().expect("create socketpair");
1059        client_side
1060            .write_all(&4u32.to_be_bytes())
1061            .expect("write length prefix only");
1062
1063        let handler_daemon = Arc::clone(&daemon);
1064        let handler_thread =
1065            std::thread::spawn(move || handler_daemon.handle_connection(server_side));
1066
1067        std::thread::sleep(Duration::from_millis(100));
1068
1069        let shutdown_requested_at = Instant::now();
1070        daemon.request_shutdown();
1071
1072        let join_budget = Duration::from_secs(3);
1073        let join_deadline = Instant::now() + join_budget;
1074        let mut joined = false;
1075        while Instant::now() < join_deadline {
1076            if handler_thread.is_finished() {
1077                joined = true;
1078                break;
1079            }
1080            std::thread::sleep(Duration::from_millis(25));
1081        }
1082
1083        assert!(
1084            joined,
1085            "handle_connection must observe shutdown while waiting for a partial payload"
1086        );
1087        let shutdown_latency = shutdown_requested_at.elapsed();
1088        assert!(
1089            shutdown_latency < Duration::from_secs(2),
1090            "partial-payload shutdown latency {shutdown_latency:?} is too high"
1091        );
1092        let result = handler_thread
1093            .join()
1094            .expect("handle_connection thread panicked");
1095        assert!(
1096            result.is_ok(),
1097            "handler must return Ok on shutdown-during-partial-payload; got {result:?}"
1098        );
1099    }
1100}