1use crate::core::config::Config;
2use crate::core::error::Result;
3use crate::core::project::{Project, ProjectEntry, Registry};
4use crate::daemon::cache::QueryCache;
5use crate::daemon::protocol::{Method, ProjectInfo, Request, Response, ResponseResult};
6use crate::daemon::watcher::WatcherManager;
7use crate::index::{IndexSearcher, IndexWriter, TantivyIndex};
8use crate::parse::{chunk_file, walk_project};
9use crate::search::SearchResponse;
10use parking_lot::{Mutex, RwLock};
11use std::collections::HashMap;
12use std::path::PathBuf;
13use std::sync::Arc;
14use std::time::{Duration, Instant, SystemTime};
15use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncWrite, AsyncWriteExt, BufReader};
16use tokio::sync::broadcast;
17use tracing::{debug, info, warn};
18
19#[cfg(unix)]
20use tokio::net::UnixListener;
21
22#[cfg(windows)]
23use tokio::net::TcpListener;
24
25pub struct DaemonState {
26 pub registry: RwLock<Registry>,
27 pub searchers: RwLock<HashMap<String, IndexSearcher>>,
28 pub cache: RwLock<QueryCache>,
29 pub watcher: Mutex<WatcherManager>,
30 pub shutdown: broadcast::Sender<()>,
31}
32
33impl Default for DaemonState {
34 fn default() -> Self {
35 Self::new()
36 }
37}
38
39impl DaemonState {
40 pub fn new() -> Self {
41 let (shutdown, _) = broadcast::channel(1);
42 Self {
43 registry: RwLock::new(Registry::load().unwrap_or_default()),
44 searchers: RwLock::new(HashMap::new()),
45 cache: RwLock::new(QueryCache::new()),
46 watcher: Mutex::new(WatcherManager::new()),
47 shutdown,
48 }
49 }
50
51 pub fn invalidate_project(&self, project_path: &PathBuf) {
53 let path_str = project_path.to_string_lossy().to_string();
54
55 {
57 let mut searchers = self.searchers.write();
58 searchers.remove(&path_str);
59 }
60
61 {
63 let mut cache = self.cache.write();
64 cache.clear_project(&path_str);
65 }
66
67 debug!(project = %path_str, "Invalidated caches after incremental update");
68 }
69}
70
71async fn run_watcher_loop(state: Arc<DaemonState>) {
73 info!("Starting file watcher for incremental indexing");
74
75 {
77 let registry = state.registry.read();
78 let mut watcher = state.watcher.lock();
79 for entry in registry.list() {
80 if entry.watching {
81 if let Err(e) = watcher.watch(entry.path.clone()) {
82 warn!(project = %entry.path.display(), error = %e, "Failed to watch project");
83 } else {
84 info!(project = %entry.path.display(), "Watching for changes");
85 }
86 }
87 }
88 }
89
90 loop {
92 let state_clone = Arc::clone(&state);
94
95 let updated_projects = tokio::task::spawn_blocking(move || {
97 let mut watcher = state_clone.watcher.lock();
98 watcher.process_events_sync()
99 })
100 .await
101 .unwrap_or_default();
102
103 for project_path in updated_projects {
105 state.invalidate_project(&project_path);
106 }
107
108 tokio::time::sleep(Duration::from_millis(500)).await;
110 }
111}
112
113#[cfg(unix)]
115pub async fn run_server() -> Result<()> {
116 let socket_path = Config::socket_path()?;
117
118 if socket_path.exists() {
120 std::fs::remove_file(&socket_path)?;
121 }
122
123 let listener = UnixListener::bind(&socket_path)?;
124 let state = Arc::new(DaemonState::new());
125
126 info!("Daemon starting...");
127
128 let watcher_state = Arc::clone(&state);
130 tokio::spawn(async move {
131 run_watcher_loop(watcher_state).await;
132 });
133
134 let mut shutdown_rx = state.shutdown.subscribe();
135
136 info!("Daemon ready, listening for connections");
137
138 loop {
139 tokio::select! {
140 result = listener.accept() => {
141 match result {
142 Ok((stream, _)) => {
143 let state = Arc::clone(&state);
144 tokio::spawn(async move {
145 if let Err(e) = handle_connection(stream, state).await {
146 eprintln!("Connection error: {}", e);
147 }
148 });
149 }
150 Err(e) => {
151 eprintln!("Accept error: {}", e);
152 }
153 }
154 }
155 _ = shutdown_rx.recv() => {
156 break;
157 }
158 }
159 }
160
161 if socket_path.exists() {
163 let _ = std::fs::remove_file(&socket_path);
164 }
165
166 Ok(())
167}
168
169#[cfg(windows)]
171pub async fn run_server() -> Result<()> {
172 let port = Config::daemon_port();
173 let addr = format!("127.0.0.1:{}", port);
174
175 let listener =
176 TcpListener::bind(&addr)
177 .await
178 .map_err(|e| crate::core::error::Error::DaemonError {
179 message: format!("Failed to bind to {}: {}", addr, e),
180 })?;
181
182 let port_path = Config::port_path()?;
184 std::fs::write(&port_path, port.to_string())?;
185
186 let state = Arc::new(DaemonState::new());
187
188 info!("Daemon starting...");
189
190 let watcher_state = Arc::clone(&state);
192 tokio::spawn(async move {
193 run_watcher_loop(watcher_state).await;
194 });
195
196 let mut shutdown_rx = state.shutdown.subscribe();
197
198 info!("Daemon ready, listening on {}", addr);
199
200 loop {
201 tokio::select! {
202 result = listener.accept() => {
203 match result {
204 Ok((stream, _)) => {
205 let state = Arc::clone(&state);
206 tokio::spawn(async move {
207 if let Err(e) = handle_connection(stream, state).await {
208 eprintln!("Connection error: {}", e);
209 }
210 });
211 }
212 Err(e) => {
213 eprintln!("Accept error: {}", e);
214 }
215 }
216 }
217 _ = shutdown_rx.recv() => {
218 break;
219 }
220 }
221 }
222
223 if port_path.exists() {
225 let _ = std::fs::remove_file(&port_path);
226 }
227
228 Ok(())
229}
230
231async fn handle_connection<S>(stream: S, state: Arc<DaemonState>) -> Result<()>
233where
234 S: AsyncRead + AsyncWrite + Unpin,
235{
236 let (reader, mut writer) = tokio::io::split(stream);
237 let mut reader = BufReader::new(reader);
238 let mut line = String::new();
239
240 while reader.read_line(&mut line).await? > 0 {
241 let request: Request = match serde_json::from_str(&line) {
242 Ok(r) => r,
243 Err(e) => {
244 let response = Response {
245 id: "error".to_string(),
246 result: ResponseResult::Error {
247 message: e.to_string(),
248 },
249 };
250 let json = serde_json::to_string(&response)? + "\n";
251 writer.write_all(json.as_bytes()).await?;
252 line.clear();
253 continue;
254 }
255 };
256
257 let response = handle_request(request, &state).await;
258 let json = serde_json::to_string(&response)? + "\n";
259 writer.write_all(json.as_bytes()).await?;
260
261 if matches!(response.result, ResponseResult::Stop { success: true }) {
263 let _ = state.shutdown.send(());
264 break;
265 }
266
267 line.clear();
268 }
269
270 Ok(())
271}
272
273async fn handle_request(request: Request, state: &DaemonState) -> Response {
274 let result = match request.method {
275 Method::Search {
276 query,
277 project,
278 limit,
279 } => handle_search(&query, &project, limit, state).await,
280
281 Method::Index { project, force } => handle_index(&project, force, state).await,
282
283 Method::IndexWatch { project } => handle_index_watch(&project, state).await,
284
285 Method::Status => handle_status(state),
286
287 Method::List => handle_list(state),
288
289 Method::Forget { project } => handle_forget(&project, state).await,
290
291 Method::Stop => ResponseResult::Stop { success: true },
292 };
293
294 Response {
295 id: request.id,
296 result,
297 }
298}
299
300async fn handle_search(
301 query: &str,
302 project_path: &str,
303 limit: usize,
304 state: &DaemonState,
305) -> ResponseResult {
306 let start = Instant::now();
307 let path = PathBuf::from(project_path);
308
309 let cache_key = format!("{}:{}:{}", project_path, query, limit);
311 {
312 let mut cache = state.cache.write();
313 if let Some(cached) = cache.get(&cache_key) {
314 return ResponseResult::Search(cached.clone());
315 }
316 }
317
318 let searcher = {
320 let searchers = state.searchers.read();
321 searchers.get(project_path).cloned()
322 };
323
324 let searcher = match searcher {
325 Some(s) => s,
326 None => {
327 match IndexSearcher::open(&path) {
329 Ok(s) => {
330 let mut searchers = state.searchers.write();
331 searchers.insert(project_path.to_string(), s.clone());
332 s
333 }
334 Err(_) => {
335 match do_index(&path, false, state).await {
337 Ok(_) => match IndexSearcher::open(&path) {
338 Ok(s) => {
339 let mut searchers = state.searchers.write();
340 searchers.insert(project_path.to_string(), s.clone());
341 s
342 }
343 Err(e) => {
344 return ResponseResult::Error {
345 message: e.to_string(),
346 }
347 }
348 },
349 Err(e) => {
350 return ResponseResult::Error {
351 message: e.to_string(),
352 }
353 }
354 }
355 }
356 }
357 }
358 };
359
360 match searcher.search(query, limit) {
362 Ok(results) => {
363 let elapsed = start.elapsed();
364 let response = SearchResponse {
365 results,
366 query: query.to_string(),
367 elapsed_ms: elapsed.as_secs_f64() * 1000.0,
368 project: project_path.to_string(),
369 };
370
371 {
373 let mut cache = state.cache.write();
374 cache.put(cache_key, response.clone());
375 }
376
377 ResponseResult::Search(response)
378 }
379 Err(e) => ResponseResult::Error {
380 message: e.to_string(),
381 },
382 }
383}
384
385async fn handle_index(project_path: &str, force: bool, state: &DaemonState) -> ResponseResult {
386 let path = PathBuf::from(project_path);
387 match do_index(&path, force, state).await {
388 Ok((file_count, chunk_count, elapsed_ms)) => ResponseResult::Index {
389 project: project_path.to_string(),
390 file_count,
391 chunk_count,
392 elapsed_ms,
393 },
394 Err(e) => ResponseResult::Error {
395 message: e.to_string(),
396 },
397 }
398}
399
400async fn do_index(
401 path: &PathBuf,
402 _force: bool,
403 state: &DaemonState,
404) -> Result<(usize, usize, f64)> {
405 let start = Instant::now();
406
407 let files = walk_project(path)?;
409 let file_count = files.len();
410
411 let index = TantivyIndex::open_or_create(path)?;
412 let mut writer = IndexWriter::new(&index)?;
413 let mut chunk_count = 0;
414
415 for file in &files {
416 let chunks = chunk_file(&file.path, &file.content);
417 for chunk in chunks {
418 writer.add_chunk(&chunk)?;
419 chunk_count += 1;
420 }
421 }
422
423 writer.commit()?;
424
425 let elapsed = start.elapsed();
426
427 {
429 let project = Project::from_path(path)?;
430 let entry = ProjectEntry {
431 path: path.clone(),
432 name: project.name,
433 indexed_at: SystemTime::now(),
434 file_count,
435 chunk_count,
436 watching: false,
437 };
438
439 let mut registry = state.registry.write();
440 registry.upsert(entry);
441 let _ = registry.save();
442 }
443
444 {
446 let mut searchers = state.searchers.write();
447 searchers.remove(&path.to_string_lossy().to_string());
448 }
449
450 {
452 let mut cache = state.cache.write();
453 cache.clear_project(&path.to_string_lossy());
454 }
455
456 Ok((file_count, chunk_count, elapsed.as_secs_f64() * 1000.0))
457}
458
459async fn handle_index_watch(project_path: &str, state: &DaemonState) -> ResponseResult {
460 let path = PathBuf::from(project_path);
461
462 let result = handle_index(project_path, false, state).await;
464
465 {
467 let mut watcher = state.watcher.lock();
468 if let Err(e) = watcher.watch(path.clone()) {
469 warn!(project = %project_path, error = %e, "Failed to start watcher");
470 } else {
471 info!(project = %project_path, "Started watching for changes");
472 }
473 }
474
475 {
477 let mut registry = state.registry.write();
478 registry.set_watching(&path, true);
479 let _ = registry.save();
480 }
481
482 result
483}
484
485fn handle_status(state: &DaemonState) -> ResponseResult {
486 let registry = state.registry.read();
487 let projects: Vec<ProjectInfo> = registry
488 .list()
489 .iter()
490 .map(|e| ProjectInfo {
491 path: e.path.to_string_lossy().to_string(),
492 name: e.name.clone(),
493 chunk_count: e.chunk_count,
494 watching: e.watching,
495 })
496 .collect();
497
498 ResponseResult::Status {
499 running: true,
500 pid: std::process::id(),
501 projects,
502 }
503}
504
505fn handle_list(state: &DaemonState) -> ResponseResult {
506 let registry = state.registry.read();
507 let projects: Vec<ProjectInfo> = registry
508 .list()
509 .iter()
510 .map(|e| ProjectInfo {
511 path: e.path.to_string_lossy().to_string(),
512 name: e.name.clone(),
513 chunk_count: e.chunk_count,
514 watching: e.watching,
515 })
516 .collect();
517
518 ResponseResult::List { projects }
519}
520
521async fn handle_forget(project_path: &str, state: &DaemonState) -> ResponseResult {
522 let path = PathBuf::from(project_path);
523
524 {
526 let mut registry = state.registry.write();
527 registry.remove(&path);
528 let _ = registry.save();
529 }
530
531 {
533 let mut searchers = state.searchers.write();
534 searchers.remove(project_path);
535 }
536
537 if let Ok(index_dir) = Config::index_dir(&path) {
539 if index_dir.exists() {
540 let _ = std::fs::remove_dir_all(&index_dir);
541 }
542 }
543
544 {
546 let mut cache = state.cache.write();
547 cache.clear_project(project_path);
548 }
549
550 ResponseResult::Forget {
551 project: project_path.to_string(),
552 success: true,
553 }
554}