1use crate::core::config::Config;
2use crate::core::error::Result;
3use crate::core::project::{Project, ProjectEntry, Registry};
4use crate::daemon::cache::QueryCache;
5use crate::daemon::events::{DaemonEvent, EventBroadcaster, FileAction};
6use crate::daemon::protocol::{Method, ProjectInfo, Request, Response, ResponseResult};
7use crate::daemon::watcher::WatcherManager;
8use crate::index::{IndexSearcher, IndexWriter, TantivyIndex};
9use crate::parse::{chunk_file, walk_project};
10use crate::search::SearchResponse;
11use parking_lot::{Mutex, RwLock};
12use std::collections::HashMap;
13use std::path::PathBuf;
14use std::sync::Arc;
15use std::time::{Duration, Instant, SystemTime};
16use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader};
17use tokio::sync::broadcast;
18use tracing::{debug, info, warn};
19
20#[cfg(unix)]
21use tokio::net::UnixListener;
22
23#[cfg(windows)]
24use tokio::net::TcpListener;
25
26pub struct DaemonState {
27 pub registry: RwLock<Registry>,
28 pub searchers: RwLock<HashMap<String, IndexSearcher>>,
29 pub cache: RwLock<QueryCache>,
30 pub watcher: Mutex<WatcherManager>,
31 pub shutdown: broadcast::Sender<()>,
32 pub events: EventBroadcaster,
33}
34
35impl Default for DaemonState {
36 fn default() -> Self {
37 Self::new()
38 }
39}
40
41impl DaemonState {
42 pub fn new() -> Self {
43 let (shutdown, _) = broadcast::channel(1);
44 Self {
45 registry: RwLock::new(Registry::load().unwrap_or_default()),
46 searchers: RwLock::new(HashMap::new()),
47 cache: RwLock::new(QueryCache::new()),
48 watcher: Mutex::new(WatcherManager::new()),
49 shutdown,
50 events: EventBroadcaster::default(),
51 }
52 }
53
54 pub fn subscribe_events(&self) -> broadcast::Receiver<DaemonEvent> {
56 self.events.subscribe()
57 }
58
59 pub fn invalidate_project(&self, project_path: &PathBuf) {
61 let path_str = project_path.to_string_lossy().to_string();
62
63 {
65 let mut searchers = self.searchers.write();
66 searchers.remove(&path_str);
67 }
68
69 {
71 let mut cache = self.cache.write();
72 cache.clear_project(&path_str);
73 }
74
75 debug!(project = %path_str, "Invalidated caches after incremental update");
76 }
77}
78
79async fn run_watcher_loop(state: Arc<DaemonState>) {
81 info!("Starting file watcher for incremental indexing");
82
83 {
85 let registry = state.registry.read();
86 let mut watcher = state.watcher.lock();
87 for entry in registry.list() {
88 if entry.watching {
89 if let Err(e) = watcher.watch(entry.path.clone()) {
90 warn!(project = %entry.path.display(), error = %e, "Failed to watch project");
91 } else {
92 info!(project = %entry.path.display(), "Watching for changes");
93 }
94 }
95 }
96 }
97
98 loop {
100 let state_clone = Arc::clone(&state);
102
103 let updated_projects = tokio::task::spawn_blocking(move || {
105 let mut watcher = state_clone.watcher.lock();
106 watcher.process_events_sync()
107 })
108 .await
109 .unwrap_or_default();
110
111 for (project_path, update_result) in updated_projects {
113 state.invalidate_project(&project_path);
114
115 for path in &update_result.changed_paths {
117 state
118 .events
119 .file_changed(&project_path, path, FileAction::Modified);
120 }
121 for path in &update_result.deleted_paths {
122 state
123 .events
124 .file_changed(&project_path, path, FileAction::Deleted);
125 }
126
127 state.events.broadcast(DaemonEvent::ReindexComplete {
129 project: project_path.to_string_lossy().to_string(),
130 files: update_result.files_reindexed + update_result.files_deleted,
131 symbols: 0, dead: 0, duration_ms: update_result.elapsed_ms,
134 });
135
136 debug!(
137 project = %project_path.display(),
138 files = update_result.files_reindexed + update_result.files_deleted,
139 elapsed_ms = update_result.elapsed_ms,
140 "Emitted reindex events"
141 );
142 }
143
144 tokio::time::sleep(Duration::from_millis(500)).await;
146 }
147}
148
149#[cfg(unix)]
151pub async fn run_server() -> Result<()> {
152 let socket_path = Config::socket_path()?;
153
154 if socket_path.exists() {
156 std::fs::remove_file(&socket_path)?;
157 }
158
159 let listener = UnixListener::bind(&socket_path)?;
160 let state = Arc::new(DaemonState::new());
161
162 info!("Daemon starting...");
163
164 let watcher_state = Arc::clone(&state);
166 tokio::spawn(async move {
167 run_watcher_loop(watcher_state).await;
168 });
169
170 let mut shutdown_rx = state.shutdown.subscribe();
171
172 info!("Daemon ready, listening for connections");
173
174 loop {
175 tokio::select! {
176 result = listener.accept() => {
177 match result {
178 Ok((stream, _)) => {
179 let state = Arc::clone(&state);
180 tokio::spawn(async move {
181 if let Err(e) = handle_connection(stream, state).await {
182 eprintln!("Connection error: {}", e);
183 }
184 });
185 }
186 Err(e) => {
187 eprintln!("Accept error: {}", e);
188 }
189 }
190 }
191 _ = shutdown_rx.recv() => {
192 break;
193 }
194 }
195 }
196
197 if socket_path.exists() {
199 let _ = std::fs::remove_file(&socket_path);
200 }
201
202 Ok(())
203}
204
205#[cfg(windows)]
207pub async fn run_server() -> Result<()> {
208 let port = Config::daemon_port();
209 let addr = format!("127.0.0.1:{}", port);
210
211 let listener =
212 TcpListener::bind(&addr)
213 .await
214 .map_err(|e| crate::core::error::Error::DaemonError {
215 message: format!("Failed to bind to {}: {}", addr, e),
216 })?;
217
218 let port_path = Config::port_path()?;
220 std::fs::write(&port_path, port.to_string())?;
221
222 let state = Arc::new(DaemonState::new());
223
224 info!("Daemon starting...");
225
226 let watcher_state = Arc::clone(&state);
228 tokio::spawn(async move {
229 run_watcher_loop(watcher_state).await;
230 });
231
232 let mut shutdown_rx = state.shutdown.subscribe();
233
234 info!("Daemon ready, listening on {}", addr);
235
236 loop {
237 tokio::select! {
238 result = listener.accept() => {
239 match result {
240 Ok((stream, _)) => {
241 let state = Arc::clone(&state);
242 tokio::spawn(async move {
243 if let Err(e) = handle_connection(stream, state).await {
244 eprintln!("Connection error: {}", e);
245 }
246 });
247 }
248 Err(e) => {
249 eprintln!("Accept error: {}", e);
250 }
251 }
252 }
253 _ = shutdown_rx.recv() => {
254 break;
255 }
256 }
257 }
258
259 if port_path.exists() {
261 let _ = std::fs::remove_file(&port_path);
262 }
263
264 Ok(())
265}
266
267async fn handle_connection<S>(stream: S, state: Arc<DaemonState>) -> Result<()>
269where
270 S: AsyncRead + AsyncWrite + Unpin,
271{
272 let (reader, mut writer) = tokio::io::split(stream);
273 let mut reader = BufReader::new(reader);
274 let mut line = String::new();
275
276 while reader.read_line(&mut line).await? > 0 {
277 let request: Request = match serde_json::from_str(&line) {
278 Ok(r) => r,
279 Err(e) => {
280 let response = Response {
281 id: "error".to_string(),
282 result: ResponseResult::Error {
283 message: e.to_string(),
284 },
285 };
286 let json = serde_json::to_string(&response)? + "\n";
287 writer.write_all(json.as_bytes()).await?;
288 line.clear();
289 continue;
290 }
291 };
292
293 if matches!(request.method, Method::Subscribe) {
295 let request_id = request.id.clone();
296
297 let response = Response {
299 id: request_id.clone(),
300 result: ResponseResult::Subscribed,
301 };
302 let json = serde_json::to_string(&response)? + "\n";
303 writer.write_all(json.as_bytes()).await?;
304
305 let mut event_rx = state.subscribe_events();
307 loop {
308 match event_rx.recv().await {
309 Ok(event) => {
310 let response = Response {
311 id: request_id.clone(),
312 result: ResponseResult::Event(event),
313 };
314 let json = serde_json::to_string(&response)? + "\n";
315 if writer.write_all(json.as_bytes()).await.is_err() {
316 break; }
318 }
319 Err(broadcast::error::RecvError::Lagged(n)) => {
320 warn!("Event subscriber lagged by {} messages", n);
321 }
322 Err(broadcast::error::RecvError::Closed) => {
323 break; }
325 }
326 }
327 break;
328 }
329
330 let response = handle_request(request, &state).await;
331 let json = serde_json::to_string(&response)? + "\n";
332 writer.write_all(json.as_bytes()).await?;
333
334 if matches!(response.result, ResponseResult::Stop { success: true }) {
336 let _ = state.shutdown.send(());
337 break;
338 }
339
340 line.clear();
341 }
342
343 Ok(())
344}
345
346async fn handle_request(request: Request, state: &DaemonState) -> Response {
347 let result = match request.method {
348 Method::Search {
349 query,
350 project,
351 limit,
352 } => handle_search(&query, &project, limit, state).await,
353
354 Method::Index { project, force } => handle_index(&project, force, state).await,
355
356 Method::IndexWatch { project } => handle_index_watch(&project, state).await,
357
358 Method::Status => handle_status(state),
359
360 Method::List => handle_list(state),
361
362 Method::Forget { project } => handle_forget(&project, state).await,
363
364 Method::Stop => ResponseResult::Stop { success: true },
365
366 Method::Subscribe => ResponseResult::Subscribed,
368 };
369
370 Response {
371 id: request.id,
372 result,
373 }
374}
375
376async fn handle_search(
377 query: &str,
378 project_path: &str,
379 limit: usize,
380 state: &DaemonState,
381) -> ResponseResult {
382 let start = Instant::now();
383 let path = PathBuf::from(project_path);
384
385 let cache_key = format!("{}:{}:{}", project_path, query, limit);
387 {
388 let mut cache = state.cache.write();
389 if let Some(cached) = cache.get(&cache_key) {
390 return ResponseResult::Search(cached.clone());
391 }
392 }
393
394 let searcher = {
396 let searchers = state.searchers.read();
397 searchers.get(project_path).cloned()
398 };
399
400 let searcher = match searcher {
401 Some(s) => s,
402 None => {
403 match IndexSearcher::open(&path) {
405 Ok(s) => {
406 let mut searchers = state.searchers.write();
407 searchers.insert(project_path.to_string(), s.clone());
408 s
409 }
410 Err(_) => {
411 match do_index(&path, false, state).await {
413 Ok(_) => match IndexSearcher::open(&path) {
414 Ok(s) => {
415 let mut searchers = state.searchers.write();
416 searchers.insert(project_path.to_string(), s.clone());
417 s
418 }
419 Err(e) => {
420 return ResponseResult::Error {
421 message: e.to_string(),
422 }
423 }
424 },
425 Err(e) => {
426 return ResponseResult::Error {
427 message: e.to_string(),
428 }
429 }
430 }
431 }
432 }
433 }
434 };
435
436 match searcher.search(query, limit) {
438 Ok(results) => {
439 let elapsed = start.elapsed();
440 let response = SearchResponse {
441 results,
442 query: query.to_string(),
443 elapsed_ms: elapsed.as_secs_f64() * 1000.0,
444 project: project_path.to_string(),
445 };
446
447 {
449 let mut cache = state.cache.write();
450 cache.put(cache_key, response.clone());
451 }
452
453 ResponseResult::Search(response)
454 }
455 Err(e) => ResponseResult::Error {
456 message: e.to_string(),
457 },
458 }
459}
460
461async fn handle_index(project_path: &str, force: bool, state: &DaemonState) -> ResponseResult {
462 let path = PathBuf::from(project_path);
463 match do_index(&path, force, state).await {
464 Ok((file_count, chunk_count, elapsed_ms)) => ResponseResult::Index {
465 project: project_path.to_string(),
466 file_count,
467 chunk_count,
468 elapsed_ms,
469 },
470 Err(e) => ResponseResult::Error {
471 message: e.to_string(),
472 },
473 }
474}
475
476async fn do_index(
477 path: &PathBuf,
478 _force: bool,
479 state: &DaemonState,
480) -> Result<(usize, usize, f64)> {
481 let start = Instant::now();
482
483 let files = walk_project(path)?;
485 let file_count = files.len();
486
487 let index = TantivyIndex::open_or_create(path)?;
488 let mut writer = IndexWriter::new(&index)?;
489 let mut chunk_count = 0;
490
491 for file in &files {
492 let chunks = chunk_file(&file.path, &file.content);
493 for chunk in chunks {
494 writer.add_chunk(&chunk)?;
495 chunk_count += 1;
496 }
497 }
498
499 writer.commit()?;
500
501 let elapsed = start.elapsed();
502
503 {
505 let project = Project::from_path(path)?;
506 let entry = ProjectEntry {
507 path: path.clone(),
508 name: project.name,
509 indexed_at: SystemTime::now(),
510 file_count,
511 chunk_count,
512 watching: false,
513 };
514
515 let mut registry = state.registry.write();
516 registry.upsert(entry);
517 let _ = registry.save();
518 }
519
520 {
522 let mut searchers = state.searchers.write();
523 searchers.remove(&path.to_string_lossy().to_string());
524 }
525
526 {
528 let mut cache = state.cache.write();
529 cache.clear_project(&path.to_string_lossy());
530 }
531
532 Ok((file_count, chunk_count, elapsed.as_secs_f64() * 1000.0))
533}
534
535async fn handle_index_watch(project_path: &str, state: &DaemonState) -> ResponseResult {
536 let path = PathBuf::from(project_path);
537
538 let result = handle_index(project_path, false, state).await;
540
541 {
543 let mut watcher = state.watcher.lock();
544 if let Err(e) = watcher.watch(path.clone()) {
545 warn!(project = %project_path, error = %e, "Failed to start watcher");
546 } else {
547 info!(project = %project_path, "Started watching for changes");
548 }
549 }
550
551 {
553 let mut registry = state.registry.write();
554 registry.set_watching(&path, true);
555 let _ = registry.save();
556 }
557
558 result
559}
560
561fn handle_status(state: &DaemonState) -> ResponseResult {
562 let registry = state.registry.read();
563 let projects: Vec<ProjectInfo> = registry
564 .list()
565 .iter()
566 .map(|e| ProjectInfo {
567 path: e.path.to_string_lossy().to_string(),
568 name: e.name.clone(),
569 chunk_count: e.chunk_count,
570 watching: e.watching,
571 })
572 .collect();
573
574 ResponseResult::Status {
575 running: true,
576 pid: std::process::id(),
577 projects,
578 }
579}
580
581fn handle_list(state: &DaemonState) -> ResponseResult {
582 let registry = state.registry.read();
583 let projects: Vec<ProjectInfo> = registry
584 .list()
585 .iter()
586 .map(|e| ProjectInfo {
587 path: e.path.to_string_lossy().to_string(),
588 name: e.name.clone(),
589 chunk_count: e.chunk_count,
590 watching: e.watching,
591 })
592 .collect();
593
594 ResponseResult::List { projects }
595}
596
597async fn handle_forget(project_path: &str, state: &DaemonState) -> ResponseResult {
598 let path = PathBuf::from(project_path);
599
600 {
602 let mut registry = state.registry.write();
603 registry.remove(&path);
604 let _ = registry.save();
605 }
606
607 {
609 let mut searchers = state.searchers.write();
610 searchers.remove(project_path);
611 }
612
613 if let Ok(index_dir) = Config::index_dir(&path) {
615 if index_dir.exists() {
616 let _ = std::fs::remove_dir_all(&index_dir);
617 }
618 }
619
620 {
622 let mut cache = state.cache.write();
623 cache.clear_project(project_path);
624 }
625
626 ResponseResult::Forget {
627 project: project_path.to_string(),
628 success: true,
629 }
630}