greppy/daemon/
server.rs

1use crate::core::config::Config;
2use crate::core::error::Result;
3use crate::core::project::{Project, ProjectEntry, Registry};
4use crate::daemon::cache::QueryCache;
5use crate::daemon::protocol::{Method, ProjectInfo, Request, Response, ResponseResult};
6use crate::daemon::watcher::WatcherManager;
7use crate::index::{IndexSearcher, IndexWriter, TantivyIndex};
8use crate::parse::{chunk_file, walk_project};
9use crate::search::SearchResponse;
10use parking_lot::{Mutex, RwLock};
11use std::collections::HashMap;
12use std::path::PathBuf;
13use std::sync::Arc;
14use std::time::{Duration, Instant, SystemTime};
15use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader};
16use tokio::sync::broadcast;
17use tracing::{debug, info, warn};
18
19#[cfg(unix)]
20use tokio::net::UnixListener;
21
22#[cfg(windows)]
23use tokio::net::TcpListener;
24
25pub struct DaemonState {
26    pub registry: RwLock<Registry>,
27    pub searchers: RwLock<HashMap<String, IndexSearcher>>,
28    pub cache: RwLock<QueryCache>,
29    pub watcher: Mutex<WatcherManager>,
30    pub shutdown: broadcast::Sender<()>,
31}
32
33impl Default for DaemonState {
34    fn default() -> Self {
35        Self::new()
36    }
37}
38
39impl DaemonState {
40    pub fn new() -> Self {
41        let (shutdown, _) = broadcast::channel(1);
42        Self {
43            registry: RwLock::new(Registry::load().unwrap_or_default()),
44            searchers: RwLock::new(HashMap::new()),
45            cache: RwLock::new(QueryCache::new()),
46            watcher: Mutex::new(WatcherManager::new()),
47            shutdown,
48        }
49    }
50
51    /// Invalidate searcher cache for a project (called after incremental update)
52    pub fn invalidate_project(&self, project_path: &PathBuf) {
53        let path_str = project_path.to_string_lossy().to_string();
54
55        // Remove cached searcher so it gets reloaded on next search
56        {
57            let mut searchers = self.searchers.write();
58            searchers.remove(&path_str);
59        }
60
61        // Clear query cache for this project
62        {
63            let mut cache = self.cache.write();
64            cache.clear_project(&path_str);
65        }
66
67        debug!(project = %path_str, "Invalidated caches after incremental update");
68    }
69}
70
71/// Background watcher loop - runs independently, doesn't block requests
72async fn run_watcher_loop(state: Arc<DaemonState>) {
73    info!("Starting file watcher for incremental indexing");
74
75    // Start watching all previously registered projects
76    {
77        let registry = state.registry.read();
78        let mut watcher = state.watcher.lock();
79        for entry in registry.list() {
80            if entry.watching {
81                if let Err(e) = watcher.watch(entry.path.clone()) {
82                    warn!(project = %entry.path.display(), error = %e, "Failed to watch project");
83                } else {
84                    info!(project = %entry.path.display(), "Watching for changes");
85                }
86            }
87        }
88    }
89
90    // Process events loop - use blocking task to avoid Send issues with MutexGuard
91    loop {
92        // Clone state for the blocking task
93        let state_clone = Arc::clone(&state);
94
95        // Process events in a blocking task (parking_lot mutex is not Send across await)
96        let updated_projects = tokio::task::spawn_blocking(move || {
97            let mut watcher = state_clone.watcher.lock();
98            watcher.process_events_sync()
99        })
100        .await
101        .unwrap_or_default();
102
103        // Invalidate caches for updated projects
104        for project_path in updated_projects {
105            state.invalidate_project(&project_path);
106        }
107
108        // Sleep to prevent busy-waiting - watcher debounces internally
109        tokio::time::sleep(Duration::from_millis(500)).await;
110    }
111}
112
113/// Run the daemon server (Unix: Unix sockets)
114#[cfg(unix)]
115pub async fn run_server() -> Result<()> {
116    let socket_path = Config::socket_path()?;
117
118    // Remove old socket if exists
119    if socket_path.exists() {
120        std::fs::remove_file(&socket_path)?;
121    }
122
123    let listener = UnixListener::bind(&socket_path)?;
124    let state = Arc::new(DaemonState::new());
125
126    info!("Daemon starting...");
127
128    // Start watcher background task - processes file changes without blocking requests
129    let watcher_state = Arc::clone(&state);
130    tokio::spawn(async move {
131        run_watcher_loop(watcher_state).await;
132    });
133
134    let mut shutdown_rx = state.shutdown.subscribe();
135
136    info!("Daemon ready, listening for connections");
137
138    loop {
139        tokio::select! {
140            result = listener.accept() => {
141                match result {
142                    Ok((stream, _)) => {
143                        let state = Arc::clone(&state);
144                        tokio::spawn(async move {
145                            if let Err(e) = handle_connection(stream, state).await {
146                                eprintln!("Connection error: {}", e);
147                            }
148                        });
149                    }
150                    Err(e) => {
151                        eprintln!("Accept error: {}", e);
152                    }
153                }
154            }
155            _ = shutdown_rx.recv() => {
156                break;
157            }
158        }
159    }
160
161    // Cleanup
162    if socket_path.exists() {
163        let _ = std::fs::remove_file(&socket_path);
164    }
165
166    Ok(())
167}
168
169/// Run the daemon server (Windows: TCP on localhost)
170#[cfg(windows)]
171pub async fn run_server() -> Result<()> {
172    let port = Config::daemon_port();
173    let addr = format!("127.0.0.1:{}", port);
174
175    let listener =
176        TcpListener::bind(&addr)
177            .await
178            .map_err(|e| crate::core::error::Error::DaemonError {
179                message: format!("Failed to bind to {}: {}", addr, e),
180            })?;
181
182    // Write port to file so clients know which port to connect to
183    let port_path = Config::port_path()?;
184    std::fs::write(&port_path, port.to_string())?;
185
186    let state = Arc::new(DaemonState::new());
187
188    info!("Daemon starting...");
189
190    // Start watcher background task - processes file changes without blocking requests
191    let watcher_state = Arc::clone(&state);
192    tokio::spawn(async move {
193        run_watcher_loop(watcher_state).await;
194    });
195
196    let mut shutdown_rx = state.shutdown.subscribe();
197
198    info!("Daemon ready, listening on {}", addr);
199
200    loop {
201        tokio::select! {
202            result = listener.accept() => {
203                match result {
204                    Ok((stream, _)) => {
205                        let state = Arc::clone(&state);
206                        tokio::spawn(async move {
207                            if let Err(e) = handle_connection(stream, state).await {
208                                eprintln!("Connection error: {}", e);
209                            }
210                        });
211                    }
212                    Err(e) => {
213                        eprintln!("Accept error: {}", e);
214                    }
215                }
216            }
217            _ = shutdown_rx.recv() => {
218                break;
219            }
220        }
221    }
222
223    // Cleanup: remove port file
224    if port_path.exists() {
225        let _ = std::fs::remove_file(&port_path);
226    }
227
228    Ok(())
229}
230
231/// Handle a connection from any stream type
232async fn handle_connection<S>(stream: S, state: Arc<DaemonState>) -> Result<()>
233where
234    S: AsyncRead + AsyncWrite + Unpin,
235{
236    let (reader, mut writer) = tokio::io::split(stream);
237    let mut reader = BufReader::new(reader);
238    let mut line = String::new();
239
240    while reader.read_line(&mut line).await? > 0 {
241        let request: Request = match serde_json::from_str(&line) {
242            Ok(r) => r,
243            Err(e) => {
244                let response = Response {
245                    id: "error".to_string(),
246                    result: ResponseResult::Error {
247                        message: e.to_string(),
248                    },
249                };
250                let json = serde_json::to_string(&response)? + "\n";
251                writer.write_all(json.as_bytes()).await?;
252                line.clear();
253                continue;
254            }
255        };
256
257        let response = handle_request(request, &state).await;
258        let json = serde_json::to_string(&response)? + "\n";
259        writer.write_all(json.as_bytes()).await?;
260
261        // Check for stop command
262        if matches!(response.result, ResponseResult::Stop { success: true }) {
263            let _ = state.shutdown.send(());
264            break;
265        }
266
267        line.clear();
268    }
269
270    Ok(())
271}
272
273async fn handle_request(request: Request, state: &DaemonState) -> Response {
274    let result = match request.method {
275        Method::Search {
276            query,
277            project,
278            limit,
279        } => handle_search(&query, &project, limit, state).await,
280
281        Method::Index { project, force } => handle_index(&project, force, state).await,
282
283        Method::IndexWatch { project } => handle_index_watch(&project, state).await,
284
285        Method::Status => handle_status(state),
286
287        Method::List => handle_list(state),
288
289        Method::Forget { project } => handle_forget(&project, state).await,
290
291        Method::Stop => ResponseResult::Stop { success: true },
292    };
293
294    Response {
295        id: request.id,
296        result,
297    }
298}
299
300async fn handle_search(
301    query: &str,
302    project_path: &str,
303    limit: usize,
304    state: &DaemonState,
305) -> ResponseResult {
306    let start = Instant::now();
307    let path = PathBuf::from(project_path);
308
309    // Check cache
310    let cache_key = format!("{}:{}:{}", project_path, query, limit);
311    {
312        let mut cache = state.cache.write();
313        if let Some(cached) = cache.get(&cache_key) {
314            return ResponseResult::Search(cached.clone());
315        }
316    }
317
318    // Get or create searcher
319    let searcher = {
320        let searchers = state.searchers.read();
321        searchers.get(project_path).cloned()
322    };
323
324    let searcher = match searcher {
325        Some(s) => s,
326        None => {
327            // Try to open existing index
328            match IndexSearcher::open(&path) {
329                Ok(s) => {
330                    let mut searchers = state.searchers.write();
331                    searchers.insert(project_path.to_string(), s.clone());
332                    s
333                }
334                Err(_) => {
335                    // Need to index first
336                    match do_index(&path, false, state).await {
337                        Ok(_) => match IndexSearcher::open(&path) {
338                            Ok(s) => {
339                                let mut searchers = state.searchers.write();
340                                searchers.insert(project_path.to_string(), s.clone());
341                                s
342                            }
343                            Err(e) => {
344                                return ResponseResult::Error {
345                                    message: e.to_string(),
346                                }
347                            }
348                        },
349                        Err(e) => {
350                            return ResponseResult::Error {
351                                message: e.to_string(),
352                            }
353                        }
354                    }
355                }
356            }
357        }
358    };
359
360    // Search
361    match searcher.search(query, limit) {
362        Ok(results) => {
363            let elapsed = start.elapsed();
364            let response = SearchResponse {
365                results,
366                query: query.to_string(),
367                elapsed_ms: elapsed.as_secs_f64() * 1000.0,
368                project: project_path.to_string(),
369            };
370
371            // Cache result
372            {
373                let mut cache = state.cache.write();
374                cache.put(cache_key, response.clone());
375            }
376
377            ResponseResult::Search(response)
378        }
379        Err(e) => ResponseResult::Error {
380            message: e.to_string(),
381        },
382    }
383}
384
385async fn handle_index(project_path: &str, force: bool, state: &DaemonState) -> ResponseResult {
386    let path = PathBuf::from(project_path);
387    match do_index(&path, force, state).await {
388        Ok((file_count, chunk_count, elapsed_ms)) => ResponseResult::Index {
389            project: project_path.to_string(),
390            file_count,
391            chunk_count,
392            elapsed_ms,
393        },
394        Err(e) => ResponseResult::Error {
395            message: e.to_string(),
396        },
397    }
398}
399
400async fn do_index(
401    path: &PathBuf,
402    _force: bool,
403    state: &DaemonState,
404) -> Result<(usize, usize, f64)> {
405    let start = Instant::now();
406
407    // Walk and chunk files
408    let files = walk_project(path)?;
409    let file_count = files.len();
410
411    let index = TantivyIndex::open_or_create(path)?;
412    let mut writer = IndexWriter::new(&index)?;
413    let mut chunk_count = 0;
414
415    for file in &files {
416        let chunks = chunk_file(&file.path, &file.content);
417        for chunk in chunks {
418            writer.add_chunk(&chunk)?;
419            chunk_count += 1;
420        }
421    }
422
423    writer.commit()?;
424
425    let elapsed = start.elapsed();
426
427    // Update registry
428    {
429        let project = Project::from_path(path)?;
430        let entry = ProjectEntry {
431            path: path.clone(),
432            name: project.name,
433            indexed_at: SystemTime::now(),
434            file_count,
435            chunk_count,
436            watching: false,
437        };
438
439        let mut registry = state.registry.write();
440        registry.upsert(entry);
441        let _ = registry.save();
442    }
443
444    // Reload searcher
445    {
446        let mut searchers = state.searchers.write();
447        searchers.remove(&path.to_string_lossy().to_string());
448    }
449
450    // Clear cache for this project
451    {
452        let mut cache = state.cache.write();
453        cache.clear_project(&path.to_string_lossy());
454    }
455
456    Ok((file_count, chunk_count, elapsed.as_secs_f64() * 1000.0))
457}
458
459async fn handle_index_watch(project_path: &str, state: &DaemonState) -> ResponseResult {
460    let path = PathBuf::from(project_path);
461
462    // First index
463    let result = handle_index(project_path, false, state).await;
464
465    // Start watching this project
466    {
467        let mut watcher = state.watcher.lock();
468        if let Err(e) = watcher.watch(path.clone()) {
469            warn!(project = %project_path, error = %e, "Failed to start watcher");
470        } else {
471            info!(project = %project_path, "Started watching for changes");
472        }
473    }
474
475    // Mark as watching in registry
476    {
477        let mut registry = state.registry.write();
478        registry.set_watching(&path, true);
479        let _ = registry.save();
480    }
481
482    result
483}
484
485fn handle_status(state: &DaemonState) -> ResponseResult {
486    let registry = state.registry.read();
487    let projects: Vec<ProjectInfo> = registry
488        .list()
489        .iter()
490        .map(|e| ProjectInfo {
491            path: e.path.to_string_lossy().to_string(),
492            name: e.name.clone(),
493            chunk_count: e.chunk_count,
494            watching: e.watching,
495        })
496        .collect();
497
498    ResponseResult::Status {
499        running: true,
500        pid: std::process::id(),
501        projects,
502    }
503}
504
505fn handle_list(state: &DaemonState) -> ResponseResult {
506    let registry = state.registry.read();
507    let projects: Vec<ProjectInfo> = registry
508        .list()
509        .iter()
510        .map(|e| ProjectInfo {
511            path: e.path.to_string_lossy().to_string(),
512            name: e.name.clone(),
513            chunk_count: e.chunk_count,
514            watching: e.watching,
515        })
516        .collect();
517
518    ResponseResult::List { projects }
519}
520
521async fn handle_forget(project_path: &str, state: &DaemonState) -> ResponseResult {
522    let path = PathBuf::from(project_path);
523
524    // Remove from registry
525    {
526        let mut registry = state.registry.write();
527        registry.remove(&path);
528        let _ = registry.save();
529    }
530
531    // Remove searcher
532    {
533        let mut searchers = state.searchers.write();
534        searchers.remove(project_path);
535    }
536
537    // Delete index directory
538    if let Ok(index_dir) = Config::index_dir(&path) {
539        if index_dir.exists() {
540            let _ = std::fs::remove_dir_all(&index_dir);
541        }
542    }
543
544    // Clear cache
545    {
546        let mut cache = state.cache.write();
547        cache.clear_project(project_path);
548    }
549
550    ResponseResult::Forget {
551        project: project_path.to_string(),
552        success: true,
553    }
554}