greppy/daemon/
server.rs

1use crate::core::config::Config;
2use crate::core::error::Result;
3use crate::core::project::{Project, ProjectEntry, Registry};
4use crate::daemon::cache::QueryCache;
5use crate::daemon::events::{DaemonEvent, EventBroadcaster, FileAction};
6use crate::daemon::protocol::{Method, ProjectInfo, Request, Response, ResponseResult};
7use crate::daemon::watcher::WatcherManager;
8use crate::index::{IndexSearcher, IndexWriter, TantivyIndex};
9use crate::parse::{chunk_file, walk_project};
10use crate::search::SearchResponse;
11use parking_lot::{Mutex, RwLock};
12use std::collections::HashMap;
13use std::path::PathBuf;
14use std::sync::Arc;
15use std::time::{Duration, Instant, SystemTime};
16use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader};
17use tokio::sync::broadcast;
18use tracing::{debug, info, warn};
19
20#[cfg(unix)]
21use tokio::net::UnixListener;
22
23#[cfg(windows)]
24use tokio::net::TcpListener;
25
26pub struct DaemonState {
27    pub registry: RwLock<Registry>,
28    pub searchers: RwLock<HashMap<String, IndexSearcher>>,
29    pub cache: RwLock<QueryCache>,
30    pub watcher: Mutex<WatcherManager>,
31    pub shutdown: broadcast::Sender<()>,
32    pub events: EventBroadcaster,
33}
34
35impl Default for DaemonState {
36    fn default() -> Self {
37        Self::new()
38    }
39}
40
41impl DaemonState {
42    pub fn new() -> Self {
43        let (shutdown, _) = broadcast::channel(1);
44        Self {
45            registry: RwLock::new(Registry::load().unwrap_or_default()),
46            searchers: RwLock::new(HashMap::new()),
47            cache: RwLock::new(QueryCache::new()),
48            watcher: Mutex::new(WatcherManager::new()),
49            shutdown,
50            events: EventBroadcaster::default(),
51        }
52    }
53
54    /// Subscribe to daemon events
55    pub fn subscribe_events(&self) -> broadcast::Receiver<DaemonEvent> {
56        self.events.subscribe()
57    }
58
59    /// Invalidate searcher cache for a project (called after incremental update)
60    pub fn invalidate_project(&self, project_path: &PathBuf) {
61        let path_str = project_path.to_string_lossy().to_string();
62
63        // Remove cached searcher so it gets reloaded on next search
64        {
65            let mut searchers = self.searchers.write();
66            searchers.remove(&path_str);
67        }
68
69        // Clear query cache for this project
70        {
71            let mut cache = self.cache.write();
72            cache.clear_project(&path_str);
73        }
74
75        debug!(project = %path_str, "Invalidated caches after incremental update");
76    }
77}
78
79/// Background watcher loop - runs independently, doesn't block requests
80async fn run_watcher_loop(state: Arc<DaemonState>) {
81    info!("Starting file watcher for incremental indexing");
82
83    // Start watching all previously registered projects
84    {
85        let registry = state.registry.read();
86        let mut watcher = state.watcher.lock();
87        for entry in registry.list() {
88            if entry.watching {
89                if let Err(e) = watcher.watch(entry.path.clone()) {
90                    warn!(project = %entry.path.display(), error = %e, "Failed to watch project");
91                } else {
92                    info!(project = %entry.path.display(), "Watching for changes");
93                }
94            }
95        }
96    }
97
98    // Process events loop - use blocking task to avoid Send issues with MutexGuard
99    loop {
100        // Clone state for the blocking task
101        let state_clone = Arc::clone(&state);
102
103        // Process events in a blocking task (parking_lot mutex is not Send across await)
104        let updated_projects = tokio::task::spawn_blocking(move || {
105            let mut watcher = state_clone.watcher.lock();
106            watcher.process_events_sync()
107        })
108        .await
109        .unwrap_or_default();
110
111        // Invalidate caches and emit events for updated projects
112        for (project_path, update_result) in updated_projects {
113            state.invalidate_project(&project_path);
114
115            // Emit file change events for each changed file
116            for path in &update_result.changed_paths {
117                state
118                    .events
119                    .file_changed(&project_path, path, FileAction::Modified);
120            }
121            for path in &update_result.deleted_paths {
122                state
123                    .events
124                    .file_changed(&project_path, path, FileAction::Deleted);
125            }
126
127            // Emit reindex complete event (incremental update)
128            state.events.broadcast(DaemonEvent::ReindexComplete {
129                project: project_path.to_string_lossy().to_string(),
130                files: update_result.files_reindexed + update_result.files_deleted,
131                symbols: 0, // Unknown without reloading full index
132                dead: 0,    // Unknown without reloading full index
133                duration_ms: update_result.elapsed_ms,
134            });
135
136            debug!(
137                project = %project_path.display(),
138                files = update_result.files_reindexed + update_result.files_deleted,
139                elapsed_ms = update_result.elapsed_ms,
140                "Emitted reindex events"
141            );
142        }
143
144        // Sleep to prevent busy-waiting - watcher debounces internally
145        tokio::time::sleep(Duration::from_millis(500)).await;
146    }
147}
148
149/// Run the daemon server (Unix: Unix sockets)
150#[cfg(unix)]
151pub async fn run_server() -> Result<()> {
152    let socket_path = Config::socket_path()?;
153
154    // Remove old socket if exists
155    if socket_path.exists() {
156        std::fs::remove_file(&socket_path)?;
157    }
158
159    let listener = UnixListener::bind(&socket_path)?;
160    let state = Arc::new(DaemonState::new());
161
162    info!("Daemon starting...");
163
164    // Start watcher background task - processes file changes without blocking requests
165    let watcher_state = Arc::clone(&state);
166    tokio::spawn(async move {
167        run_watcher_loop(watcher_state).await;
168    });
169
170    let mut shutdown_rx = state.shutdown.subscribe();
171
172    info!("Daemon ready, listening for connections");
173
174    loop {
175        tokio::select! {
176            result = listener.accept() => {
177                match result {
178                    Ok((stream, _)) => {
179                        let state = Arc::clone(&state);
180                        tokio::spawn(async move {
181                            if let Err(e) = handle_connection(stream, state).await {
182                                eprintln!("Connection error: {}", e);
183                            }
184                        });
185                    }
186                    Err(e) => {
187                        eprintln!("Accept error: {}", e);
188                    }
189                }
190            }
191            _ = shutdown_rx.recv() => {
192                break;
193            }
194        }
195    }
196
197    // Cleanup
198    if socket_path.exists() {
199        let _ = std::fs::remove_file(&socket_path);
200    }
201
202    Ok(())
203}
204
205/// Run the daemon server (Windows: TCP on localhost)
206#[cfg(windows)]
207pub async fn run_server() -> Result<()> {
208    let port = Config::daemon_port();
209    let addr = format!("127.0.0.1:{}", port);
210
211    let listener =
212        TcpListener::bind(&addr)
213            .await
214            .map_err(|e| crate::core::error::Error::DaemonError {
215                message: format!("Failed to bind to {}: {}", addr, e),
216            })?;
217
218    // Write port to file so clients know which port to connect to
219    let port_path = Config::port_path()?;
220    std::fs::write(&port_path, port.to_string())?;
221
222    let state = Arc::new(DaemonState::new());
223
224    info!("Daemon starting...");
225
226    // Start watcher background task - processes file changes without blocking requests
227    let watcher_state = Arc::clone(&state);
228    tokio::spawn(async move {
229        run_watcher_loop(watcher_state).await;
230    });
231
232    let mut shutdown_rx = state.shutdown.subscribe();
233
234    info!("Daemon ready, listening on {}", addr);
235
236    loop {
237        tokio::select! {
238            result = listener.accept() => {
239                match result {
240                    Ok((stream, _)) => {
241                        let state = Arc::clone(&state);
242                        tokio::spawn(async move {
243                            if let Err(e) = handle_connection(stream, state).await {
244                                eprintln!("Connection error: {}", e);
245                            }
246                        });
247                    }
248                    Err(e) => {
249                        eprintln!("Accept error: {}", e);
250                    }
251                }
252            }
253            _ = shutdown_rx.recv() => {
254                break;
255            }
256        }
257    }
258
259    // Cleanup: remove port file
260    if port_path.exists() {
261        let _ = std::fs::remove_file(&port_path);
262    }
263
264    Ok(())
265}
266
267/// Handle a connection from any stream type
268async fn handle_connection<S>(stream: S, state: Arc<DaemonState>) -> Result<()>
269where
270    S: AsyncRead + AsyncWrite + Unpin,
271{
272    let (reader, mut writer) = tokio::io::split(stream);
273    let mut reader = BufReader::new(reader);
274    let mut line = String::new();
275
276    while reader.read_line(&mut line).await? > 0 {
277        let request: Request = match serde_json::from_str(&line) {
278            Ok(r) => r,
279            Err(e) => {
280                let response = Response {
281                    id: "error".to_string(),
282                    result: ResponseResult::Error {
283                        message: e.to_string(),
284                    },
285                };
286                let json = serde_json::to_string(&response)? + "\n";
287                writer.write_all(json.as_bytes()).await?;
288                line.clear();
289                continue;
290            }
291        };
292
293        // Special handling for Subscribe - starts streaming events
294        if matches!(request.method, Method::Subscribe) {
295            let request_id = request.id.clone();
296
297            // Send initial confirmation
298            let response = Response {
299                id: request_id.clone(),
300                result: ResponseResult::Subscribed,
301            };
302            let json = serde_json::to_string(&response)? + "\n";
303            writer.write_all(json.as_bytes()).await?;
304
305            // Stream events until connection closes
306            let mut event_rx = state.subscribe_events();
307            loop {
308                match event_rx.recv().await {
309                    Ok(event) => {
310                        let response = Response {
311                            id: request_id.clone(),
312                            result: ResponseResult::Event(event),
313                        };
314                        let json = serde_json::to_string(&response)? + "\n";
315                        if writer.write_all(json.as_bytes()).await.is_err() {
316                            break; // Client disconnected
317                        }
318                    }
319                    Err(broadcast::error::RecvError::Lagged(n)) => {
320                        warn!("Event subscriber lagged by {} messages", n);
321                    }
322                    Err(broadcast::error::RecvError::Closed) => {
323                        break; // Channel closed
324                    }
325                }
326            }
327            break;
328        }
329
330        let response = handle_request(request, &state).await;
331        let json = serde_json::to_string(&response)? + "\n";
332        writer.write_all(json.as_bytes()).await?;
333
334        // Check for stop command
335        if matches!(response.result, ResponseResult::Stop { success: true }) {
336            let _ = state.shutdown.send(());
337            break;
338        }
339
340        line.clear();
341    }
342
343    Ok(())
344}
345
346async fn handle_request(request: Request, state: &DaemonState) -> Response {
347    let result = match request.method {
348        Method::Search {
349            query,
350            project,
351            limit,
352        } => handle_search(&query, &project, limit, state).await,
353
354        Method::Index { project, force } => handle_index(&project, force, state).await,
355
356        Method::IndexWatch { project } => handle_index_watch(&project, state).await,
357
358        Method::Status => handle_status(state),
359
360        Method::List => handle_list(state),
361
362        Method::Forget { project } => handle_forget(&project, state).await,
363
364        Method::Stop => ResponseResult::Stop { success: true },
365
366        // Subscribe is handled specially in handle_connection
367        Method::Subscribe => ResponseResult::Subscribed,
368    };
369
370    Response {
371        id: request.id,
372        result,
373    }
374}
375
376async fn handle_search(
377    query: &str,
378    project_path: &str,
379    limit: usize,
380    state: &DaemonState,
381) -> ResponseResult {
382    let start = Instant::now();
383    let path = PathBuf::from(project_path);
384
385    // Check cache
386    let cache_key = format!("{}:{}:{}", project_path, query, limit);
387    {
388        let mut cache = state.cache.write();
389        if let Some(cached) = cache.get(&cache_key) {
390            return ResponseResult::Search(cached.clone());
391        }
392    }
393
394    // Get or create searcher
395    let searcher = {
396        let searchers = state.searchers.read();
397        searchers.get(project_path).cloned()
398    };
399
400    let searcher = match searcher {
401        Some(s) => s,
402        None => {
403            // Try to open existing index
404            match IndexSearcher::open(&path) {
405                Ok(s) => {
406                    let mut searchers = state.searchers.write();
407                    searchers.insert(project_path.to_string(), s.clone());
408                    s
409                }
410                Err(_) => {
411                    // Need to index first
412                    match do_index(&path, false, state).await {
413                        Ok(_) => match IndexSearcher::open(&path) {
414                            Ok(s) => {
415                                let mut searchers = state.searchers.write();
416                                searchers.insert(project_path.to_string(), s.clone());
417                                s
418                            }
419                            Err(e) => {
420                                return ResponseResult::Error {
421                                    message: e.to_string(),
422                                }
423                            }
424                        },
425                        Err(e) => {
426                            return ResponseResult::Error {
427                                message: e.to_string(),
428                            }
429                        }
430                    }
431                }
432            }
433        }
434    };
435
436    // Search
437    match searcher.search(query, limit) {
438        Ok(results) => {
439            let elapsed = start.elapsed();
440            let response = SearchResponse {
441                results,
442                query: query.to_string(),
443                elapsed_ms: elapsed.as_secs_f64() * 1000.0,
444                project: project_path.to_string(),
445            };
446
447            // Cache result
448            {
449                let mut cache = state.cache.write();
450                cache.put(cache_key, response.clone());
451            }
452
453            ResponseResult::Search(response)
454        }
455        Err(e) => ResponseResult::Error {
456            message: e.to_string(),
457        },
458    }
459}
460
461async fn handle_index(project_path: &str, force: bool, state: &DaemonState) -> ResponseResult {
462    let path = PathBuf::from(project_path);
463    match do_index(&path, force, state).await {
464        Ok((file_count, chunk_count, elapsed_ms)) => ResponseResult::Index {
465            project: project_path.to_string(),
466            file_count,
467            chunk_count,
468            elapsed_ms,
469        },
470        Err(e) => ResponseResult::Error {
471            message: e.to_string(),
472        },
473    }
474}
475
476async fn do_index(
477    path: &PathBuf,
478    _force: bool,
479    state: &DaemonState,
480) -> Result<(usize, usize, f64)> {
481    let start = Instant::now();
482
483    // Walk and chunk files
484    let files = walk_project(path)?;
485    let file_count = files.len();
486
487    let index = TantivyIndex::open_or_create(path)?;
488    let mut writer = IndexWriter::new(&index)?;
489    let mut chunk_count = 0;
490
491    for file in &files {
492        let chunks = chunk_file(&file.path, &file.content);
493        for chunk in chunks {
494            writer.add_chunk(&chunk)?;
495            chunk_count += 1;
496        }
497    }
498
499    writer.commit()?;
500
501    let elapsed = start.elapsed();
502
503    // Update registry
504    {
505        let project = Project::from_path(path)?;
506        let entry = ProjectEntry {
507            path: path.clone(),
508            name: project.name,
509            indexed_at: SystemTime::now(),
510            file_count,
511            chunk_count,
512            watching: false,
513        };
514
515        let mut registry = state.registry.write();
516        registry.upsert(entry);
517        let _ = registry.save();
518    }
519
520    // Reload searcher
521    {
522        let mut searchers = state.searchers.write();
523        searchers.remove(&path.to_string_lossy().to_string());
524    }
525
526    // Clear cache for this project
527    {
528        let mut cache = state.cache.write();
529        cache.clear_project(&path.to_string_lossy());
530    }
531
532    Ok((file_count, chunk_count, elapsed.as_secs_f64() * 1000.0))
533}
534
535async fn handle_index_watch(project_path: &str, state: &DaemonState) -> ResponseResult {
536    let path = PathBuf::from(project_path);
537
538    // First index
539    let result = handle_index(project_path, false, state).await;
540
541    // Start watching this project
542    {
543        let mut watcher = state.watcher.lock();
544        if let Err(e) = watcher.watch(path.clone()) {
545            warn!(project = %project_path, error = %e, "Failed to start watcher");
546        } else {
547            info!(project = %project_path, "Started watching for changes");
548        }
549    }
550
551    // Mark as watching in registry
552    {
553        let mut registry = state.registry.write();
554        registry.set_watching(&path, true);
555        let _ = registry.save();
556    }
557
558    result
559}
560
561fn handle_status(state: &DaemonState) -> ResponseResult {
562    let registry = state.registry.read();
563    let projects: Vec<ProjectInfo> = registry
564        .list()
565        .iter()
566        .map(|e| ProjectInfo {
567            path: e.path.to_string_lossy().to_string(),
568            name: e.name.clone(),
569            chunk_count: e.chunk_count,
570            watching: e.watching,
571        })
572        .collect();
573
574    ResponseResult::Status {
575        running: true,
576        pid: std::process::id(),
577        projects,
578    }
579}
580
581fn handle_list(state: &DaemonState) -> ResponseResult {
582    let registry = state.registry.read();
583    let projects: Vec<ProjectInfo> = registry
584        .list()
585        .iter()
586        .map(|e| ProjectInfo {
587            path: e.path.to_string_lossy().to_string(),
588            name: e.name.clone(),
589            chunk_count: e.chunk_count,
590            watching: e.watching,
591        })
592        .collect();
593
594    ResponseResult::List { projects }
595}
596
597async fn handle_forget(project_path: &str, state: &DaemonState) -> ResponseResult {
598    let path = PathBuf::from(project_path);
599
600    // Remove from registry
601    {
602        let mut registry = state.registry.write();
603        registry.remove(&path);
604        let _ = registry.save();
605    }
606
607    // Remove searcher
608    {
609        let mut searchers = state.searchers.write();
610        searchers.remove(project_path);
611    }
612
613    // Delete index directory
614    if let Ok(index_dir) = Config::index_dir(&path) {
615        if index_dir.exists() {
616            let _ = std::fs::remove_dir_all(&index_dir);
617        }
618    }
619
620    // Clear cache
621    {
622        let mut cache = state.cache.write();
623        cache.clear_project(project_path);
624    }
625
626    ResponseResult::Forget {
627        project: project_path.to_string(),
628        success: true,
629    }
630}