Skip to main content

codesearch/index/
manager.rs

1//! Index management module with auto-refresh and file watching support.
2//!
3//! This module provides a unified interface for both MCP and HTTP server
4//! to manage index lifecycle: initial load/refresh and background file watching.
5//!
6//! # Multi-instance Support
7//!
8//! When multiple processes need to access the same database (e.g., two terminal windows
9//! in the same directory), this module supports:
10//!
11//! - **Writer mode**: First instance gets write access with file watching enabled
12//! - **Readonly mode**: Subsequent instances open in readonly mode (no writes, no watcher)
13//!
14//! A lock file (`.writer.lock`) in the database directory indicates an active writer.
15//!
16#![allow(dead_code)]
17
18use crate::cache::{normalize_path, normalize_path_str};
19use crate::constants::{DB_DIR_NAME, DEFAULT_FSW_DEBOUNCE_MS, FILE_META_DB_NAME, WRITER_LOCK_FILE};
20use crate::embed::ModelType;
21use crate::fts::FtsStore;
22use crate::vectordb::VectorStore;
23use crate::watch::{FileEvent, FileWatcher};
24use std::collections::HashSet;
25use std::fs::File;
26use std::path::{Path, PathBuf};
27use std::sync::Arc;
28use tokio::sync::{Mutex, RwLock};
29use tokio_util::sync::CancellationToken;
30use tracing::{debug, error, info, warn};
31
32// Import Result from the parent module
33use super::Result;
34
35/// Batch flush timeout in milliseconds.
36/// Events are batched and flushed when:
37/// 1. No new events for this duration, OR
38/// 2. Buffer has events and this duration passes since last flush
39const FSW_BATCH_FLUSH_MS: u64 = 2000;
40
41// === Lock File Management ===
42
43/// Check if the database is currently locked by another process.
44///
45/// Returns `true` if another process has the write lock.
46pub fn is_database_locked(db_path: &Path) -> bool {
47    use fs2::FileExt;
48
49    let lock_path = db_path.join(WRITER_LOCK_FILE);
50    if !lock_path.exists() {
51        return false;
52    }
53
54    // Try to acquire an exclusive lock on the file
55    // If we can't, another process holds the lock
56    match File::options().read(true).write(true).open(&lock_path) {
57        Ok(file) => {
58            // try_lock_exclusive returns Ok(()) if we got the lock, Err if not
59            match file.try_lock_exclusive() {
60                Ok(()) => {
61                    // We got the lock, so it wasn't locked. Release it.
62                    let _ = file.unlock();
63                    false
64                }
65                Err(_) => {
66                    // Could not acquire lock - another process has it
67                    true
68                }
69            }
70        }
71        Err(_) => {
72            // If we can't open the file, assume it's not locked
73            // (file might not exist or permissions issue)
74            false
75        }
76    }
77}
78
79/// Acquire the writer lock for the database.
80///
81/// Returns the lock file handle (keep it open to hold the lock).
82/// Returns `None` if the lock is already held by another process.
83pub fn acquire_writer_lock(db_path: &Path) -> Option<File> {
84    use fs2::FileExt;
85
86    let lock_path = db_path.join(WRITER_LOCK_FILE);
87
88    // Create or open the lock file
89    let file = match File::options()
90        .read(true)
91        .write(true)
92        .create(true)
93        .truncate(false)
94        .open(&lock_path)
95    {
96        Ok(f) => f,
97        Err(e) => {
98            warn!("Failed to open lock file: {}", e);
99            return None;
100        }
101    };
102
103    // Try to acquire exclusive lock (non-blocking)
104    match file.try_lock_exclusive() {
105        Ok(()) => {
106            // Successfully acquired lock
107            debug!("🔒 Writer lock acquired");
108            Some(file)
109        }
110        Err(e) => {
111            // Failed to acquire lock - another process holds it
112            debug!("🔒 Failed to acquire writer lock: {}", e);
113            None
114        }
115    }
116}
117
118/// Release the writer lock (done automatically when File is dropped)
119#[allow(dead_code)]
120pub fn release_writer_lock(_lock: File) {
121    // Lock is released automatically when the File is dropped
122    debug!("🔓 Writer lock released");
123}
124
125/// Shared stores for concurrent access between MCP service and file watcher.
126///
127/// Uses RwLock to allow multiple concurrent readers (searches) with exclusive writer (indexing).
128pub struct SharedStores {
129    pub vector_store: Arc<RwLock<VectorStore>>,
130    pub fts_store: Arc<RwLock<FtsStore>>,
131    /// Lock file handle (Some = we have writer lock, None = readonly mode)
132    #[allow(dead_code)]
133    writer_lock: Option<File>,
134    /// Whether this instance is in readonly mode
135    pub readonly: bool,
136}
137
138impl SharedStores {
139    /// Create new shared stores from the database path (read-write mode).
140    ///
141    /// This acquires a writer lock. If another process already has the lock,
142    /// this will fail with an error.
143    pub fn new(db_path: &Path, dimensions: usize) -> Result<Self> {
144        // Try to acquire writer lock
145        let lock = acquire_writer_lock(db_path);
146        if lock.is_none() {
147            return Err(anyhow::anyhow!(
148                "Database is locked by another process. Use new_readonly() instead."
149            ));
150        }
151
152        let vector_store = VectorStore::new(db_path, dimensions)?;
153        let fts_store = FtsStore::new_with_writer(db_path)?;
154
155        info!("📦 SharedStores created in read-write mode");
156
157        Ok(Self {
158            vector_store: Arc::new(RwLock::new(vector_store)),
159            fts_store: Arc::new(RwLock::new(fts_store)),
160            writer_lock: lock,
161            readonly: false,
162        })
163    }
164
165    /// Create shared stores in readonly mode (for secondary instances).
166    ///
167    /// This does not acquire any locks and cannot write to the database.
168    /// File watching is not supported in readonly mode.
169    pub fn new_readonly(db_path: &Path, dimensions: usize) -> Result<Self> {
170        let vector_store = VectorStore::open_readonly(db_path, dimensions)?;
171        let fts_store = FtsStore::new(db_path)?; // Read-only without writer
172
173        info!("📦 SharedStores created in readonly mode");
174
175        Ok(Self {
176            vector_store: Arc::new(RwLock::new(vector_store)),
177            fts_store: Arc::new(RwLock::new(fts_store)),
178            writer_lock: None,
179            readonly: true,
180        })
181    }
182
183    /// Try to create shared stores, falling back to readonly mode if locked.
184    ///
185    /// Returns (SharedStores, is_readonly) tuple.
186    pub fn new_or_readonly(db_path: &Path, dimensions: usize) -> Result<(Self, bool)> {
187        // First, check if locked
188        if is_database_locked(db_path) {
189            info!("🔒 Database is locked by another process, opening in readonly mode...");
190            let stores = Self::new_readonly(db_path, dimensions)?;
191            return Ok((stores, true));
192        }
193
194        // Try to create in write mode
195        match Self::new(db_path, dimensions) {
196            Ok(stores) => Ok((stores, false)),
197            Err(e) => {
198                // If failed to acquire lock, try readonly
199                if e.to_string().contains("locked") {
200                    info!("🔒 Failed to acquire lock, opening in readonly mode...");
201                    let stores = Self::new_readonly(db_path, dimensions)?;
202                    Ok((stores, true))
203                } else {
204                    Err(e)
205                }
206            }
207        }
208    }
209}
210
211/// Index manager that handles index lifecycle and file watching.
212///
213/// Provides two-phase initialization:
214/// 1. `new()` - Load or refresh index at startup
215/// 2. `start_file_watcher()` - Start background file watching
216pub struct IndexManager {
217    /// Path to the codebase to index
218    codebase_path: PathBuf,
219    /// Path to the database
220    db_path: PathBuf,
221    /// File watcher instance
222    watcher: Arc<Mutex<FileWatcher>>,
223    /// Shared stores for concurrent access
224    stores: Arc<SharedStores>,
225}
226
227impl IndexManager {
228    /// Create a new index manager with shared stores.
229    ///
230    /// This is the **first method call** - should be called at server startup.
231    ///
232    /// # Arguments
233    /// * `codebase_path` - Path to the codebase to index
234    /// * `stores` - Shared stores for concurrent access (created by caller)
235    ///
236    /// # Returns
237    /// * `Result<Self>` - Index manager instance or error
238    ///
239    /// # Behavior
240    /// - Checks if index exists and is up-to-date
241    /// - **ERROR if index doesn't exist** - user must run `codesearch index add` first
242    /// - If index exists, performs incremental refresh
243    /// - Logs all operations with detailed info
244    ///
245    /// # Errors
246    /// - Returns error if index doesn't exist (user must create index first)
247    pub async fn new<P: AsRef<Path>>(codebase_path: P, stores: Arc<SharedStores>) -> Result<Self> {
248        let path_buf = codebase_path.as_ref().to_path_buf();
249        let db_path = path_buf.join(DB_DIR_NAME);
250
251        info!("🔍 Initializing index manager for: {}", path_buf.display());
252
253        // Check if index exists
254        let needs_initial = Self::needs_initial_indexing(&path_buf).await?;
255
256        if needs_initial {
257            // Index doesn't exist - ERROR, don't auto-create
258            let error_msg = format!(
259                "❌ No index found for: {}\n\n\
260                 💡 To create an index, run one of these commands:\n\
261                 • For local index:  codesearch index add\n\
262                 • For global index: codesearch index add -g\n\n\
263                 Then start the server again.",
264                path_buf.display()
265            );
266            return Err(anyhow::anyhow!(error_msg));
267        }
268
269        // Index exists, perform incremental refresh
270        info!("🔄 Index exists, performing incremental refresh...");
271        Self::perform_incremental_refresh(&path_buf).await?;
272
273        // Create file watcher (but don't start it yet)
274        debug!("👀 Creating file watcher...");
275        let watcher = FileWatcher::new(path_buf.clone());
276        let watcher = Arc::new(Mutex::new(watcher));
277
278        info!("✅ Index manager initialized successfully");
279
280        Ok(Self {
281            codebase_path: path_buf,
282            db_path,
283            watcher,
284            stores,
285        })
286    }
287
288    /// Get a reference to the shared stores (for CodesearchService)
289    pub fn stores(&self) -> Arc<SharedStores> {
290        self.stores.clone()
291    }
292
293    /// Create a new index manager WITHOUT performing incremental refresh.
294    ///
295    /// Use this when the caller has already performed the refresh (e.g., MCP server).
296    /// This avoids FTS lock conflicts by allowing the caller to control when the
297    /// refresh happens relative to SharedStores creation.
298    ///
299    /// # Arguments
300    /// * `codebase_path` - Path to the codebase to index
301    /// * `stores` - Shared stores for concurrent access (created by caller)
302    pub async fn new_without_refresh<P: AsRef<Path>>(
303        codebase_path: P,
304        stores: Arc<SharedStores>,
305    ) -> Result<Self> {
306        let path_buf = codebase_path.as_ref().to_path_buf();
307        let db_path = path_buf.join(DB_DIR_NAME);
308
309        info!(
310            "🔍 Initializing index manager (no refresh) for: {}",
311            path_buf.display()
312        );
313
314        // Check if index exists
315        let needs_initial = Self::needs_initial_indexing(&path_buf).await?;
316
317        if needs_initial {
318            // Index doesn't exist - ERROR, don't auto-create
319            let error_msg = format!(
320                "❌ No index found for: {}\n\n\
321                 💡 To create an index, run one of these commands:\n\
322                 • For local index:  codesearch index add\n\
323                 • For global index: codesearch index add -g\n\n\
324                 Then start the server again.",
325                path_buf.display()
326            );
327            return Err(anyhow::anyhow!(error_msg));
328        }
329
330        // Create file watcher (but don't start it yet)
331        debug!("👀 Creating file watcher...");
332        let watcher = FileWatcher::new(path_buf.clone());
333        let watcher = Arc::new(Mutex::new(watcher));
334
335        info!("✅ Index manager initialized successfully (refresh skipped)");
336
337        Ok(Self {
338            codebase_path: path_buf,
339            db_path,
340            watcher,
341            stores,
342        })
343    }
344
345    /// Perform incremental refresh using shared stores.
346    ///
347    /// This checks for changed/deleted files since last index and updates
348    /// the index accordingly. Uses the shared stores to avoid lock conflicts.
349    pub async fn perform_incremental_refresh_with_stores(
350        codebase_path: &Path,
351        db_path: &Path,
352        stores: &SharedStores,
353    ) -> Result<()> {
354        use crate::cache::FileMetaStore;
355        use crate::chunker::SemanticChunker;
356        use crate::embed::EmbeddingService;
357        use crate::file::FileWalker;
358
359        info!("🔄 Performing incremental refresh with shared stores...");
360        let start = std::time::Instant::now();
361
362        // Read model metadata
363        let metadata_path = db_path.join("metadata.json");
364        let (model_name, dimensions) = if metadata_path.exists() {
365            let content = std::fs::read_to_string(&metadata_path)?;
366            let json: serde_json::Value = serde_json::from_str(&content)?;
367            let model = json
368                .get("model")
369                .and_then(|v| v.as_str())
370                .unwrap_or("minilm-l6-q");
371            let dims = json
372                .get("dimensions")
373                .and_then(|v| v.as_u64())
374                .unwrap_or(384) as usize;
375            (model.to_string(), dims)
376        } else {
377            return Err(anyhow::anyhow!("No metadata.json found in database"));
378        };
379
380        // Load FileMetaStore
381        let mut file_meta_store = FileMetaStore::load_or_create(db_path, &model_name, dimensions)?;
382
383        // Walk files
384        let walker = FileWalker::new(codebase_path.to_path_buf());
385        let (files, _stats) = walker.walk()?;
386
387        // Find changed and deleted files
388        let mut changed_files = Vec::new();
389        let mut unchanged_count = 0;
390
391        for file in &files {
392            let (needs_reindex, _old_chunk_ids) = file_meta_store.check_file(&file.path)?;
393            if needs_reindex {
394                changed_files.push(file.clone());
395                debug!("📝 File changed: {}", file.path.display());
396            } else {
397                unchanged_count += 1;
398            }
399        }
400
401        // Find deleted files
402        let deleted_files = file_meta_store.find_deleted_files();
403
404        info!(
405            "   Unchanged: {}, Changed: {}, Deleted: {}",
406            unchanged_count,
407            changed_files.len(),
408            deleted_files.len()
409        );
410
411        // If no changes, we're done
412        if changed_files.is_empty() && deleted_files.is_empty() {
413            info!("✅ Index is up to date!");
414            return Ok(());
415        }
416
417        // Delete chunks for deleted files
418        for (file_path, chunk_ids) in &deleted_files {
419            if !chunk_ids.is_empty() {
420                debug!("🗑️  Deleting {} chunks for: {}", chunk_ids.len(), file_path);
421
422                // Delete from vector store
423                {
424                    let mut store = stores.vector_store.write().await;
425                    store.delete_chunks(chunk_ids)?;
426                }
427
428                // Delete from FTS
429                {
430                    let mut fts_store = stores.fts_store.write().await;
431                    for chunk_id in chunk_ids {
432                        fts_store.delete_chunk(*chunk_id)?;
433                    }
434                }
435            }
436            file_meta_store.remove_file(Path::new(file_path));
437        }
438
439        // Delete old chunks for changed files
440        for file in &changed_files {
441            let (_, old_chunk_ids) = file_meta_store.check_file(&file.path)?;
442            if !old_chunk_ids.is_empty() {
443                debug!(
444                    "🔄 Deleting {} old chunks for: {}",
445                    old_chunk_ids.len(),
446                    file.path.display()
447                );
448
449                // Delete from vector store
450                {
451                    let mut store = stores.vector_store.write().await;
452                    store.delete_chunks(&old_chunk_ids)?;
453                }
454
455                // Delete from FTS
456                {
457                    let mut fts_store = stores.fts_store.write().await;
458                    for chunk_id in &old_chunk_ids {
459                        fts_store.delete_chunk(*chunk_id)?;
460                    }
461                }
462            }
463        }
464
465        // Commit FTS deletions
466        {
467            let mut fts_store = stores.fts_store.write().await;
468            fts_store.commit()?;
469        }
470
471        // Chunk changed files
472        if !changed_files.is_empty() {
473            info!("🔄 Processing {} changed files...", changed_files.len());
474
475            let mut chunker = SemanticChunker::new(100, 2000, 10);
476            let mut all_chunks = Vec::new();
477
478            for file in &changed_files {
479                let content = match std::fs::read_to_string(&file.path) {
480                    Ok(c) => c,
481                    Err(_) => continue,
482                };
483                let chunks = chunker.chunk_semantic(file.language, &file.path, &content)?;
484                all_chunks.extend(chunks);
485            }
486
487            if !all_chunks.is_empty() {
488                // Embed chunks
489                info!("📦 Embedding {} chunks...", all_chunks.len());
490                let cache_dir = crate::constants::get_global_models_cache_dir()?;
491                let mut embedding_service = EmbeddingService::with_cache_dir(
492                    ModelType::default(),
493                    Some(cache_dir.as_path()),
494                )?;
495                let embedded_chunks = embedding_service.embed_chunks(all_chunks)?;
496
497                // Insert into vector store
498                let chunk_ids = {
499                    let mut store = stores.vector_store.write().await;
500                    let ids = store.insert_chunks_with_ids(embedded_chunks.clone())?;
501                    store.build_index()?;
502                    ids
503                };
504
505                // Insert into FTS
506                {
507                    let mut fts_store = stores.fts_store.write().await;
508                    for (chunk, chunk_id) in embedded_chunks.iter().zip(chunk_ids.iter()) {
509                        let path_str = chunk.chunk.path.to_string();
510                        let signature = chunk.chunk.signature.as_deref();
511                        let kind = format!("{:?}", chunk.chunk.kind);
512                        fts_store.add_chunk(
513                            *chunk_id,
514                            &chunk.chunk.content,
515                            &path_str,
516                            signature,
517                            &kind,
518                        )?;
519                    }
520                    fts_store.commit()?;
521                }
522
523                // Update file metadata
524                // Group chunks by file path (normalize for consistent lookup)
525                let mut chunks_by_file: std::collections::HashMap<String, Vec<u32>> =
526                    std::collections::HashMap::new();
527                for (chunk, chunk_id) in embedded_chunks.iter().zip(chunk_ids.iter()) {
528                    chunks_by_file
529                        .entry(normalize_path_str(&chunk.chunk.path))
530                        .or_default()
531                        .push(*chunk_id);
532                }
533
534                for file in &changed_files {
535                    let path_str = normalize_path(&file.path);
536                    if let Some(ids) = chunks_by_file.get(&path_str) {
537                        file_meta_store.update_file(&file.path, ids.clone())?;
538                    }
539                }
540
541                info!("✅ Indexed {} chunks", embedded_chunks.len());
542            }
543        }
544
545        // Save file metadata
546        file_meta_store.save(db_path)?;
547
548        let elapsed = start.elapsed();
549        info!(
550            "✅ Incremental refresh completed in {:.2}s",
551            elapsed.as_secs_f64()
552        );
553
554        Ok(())
555    }
556
557    /// Start the file system watcher (begin collecting events) without starting the processing loop.
558    ///
559    /// Call this BEFORE a long-running operation (like incremental refresh) to capture
560    /// file changes that happen during that operation. Then call `start_file_watcher()`
561    /// afterwards to begin processing the buffered events.
562    pub async fn start_watching(&self) -> Result<()> {
563        let mut w = self.watcher.lock().await;
564        if !w.is_started() {
565            w.start(DEFAULT_FSW_DEBOUNCE_MS)?;
566            info!("👀 File watcher pre-started (collecting events)");
567        }
568        Ok(())
569    }
570
571    /// Start the background file watcher.
572    ///
573    /// This is the **second method call** - should be called after `new()`.
574    /// Spawns a background task that watches for file changes and refreshes the index.
575    ///
576    /// # Arguments
577    /// * `cancel_token` - Cancellation token for graceful shutdown
578    ///
579    /// # Returns
580    /// * `Result<()>` - Success or error
581    ///
582    /// # Behavior
583    /// - Spawns a detached background task
584    /// - Watches for file modifications, deletions, and renames
585    /// - **Batches events** to avoid overhead with rapid changes
586    /// - Flushes batch when no new events for FSW_BATCH_FLUSH_MS
587    /// - Logs all file system events and refresh operations
588    /// - Continues running even if individual refresh operations fail
589    /// - Stops gracefully when the cancellation token is cancelled
590    pub async fn start_file_watcher(&self, cancel_token: CancellationToken) -> Result<()> {
591        let path = self.codebase_path.clone();
592        let db_path = self.db_path.clone();
593        let watcher = self.watcher.clone();
594        let stores = self.stores.clone();
595
596        info!("🚀 Starting background file watcher...");
597
598        // Spawn background task
599        tokio::spawn(async move {
600            info!("👀 File watcher task started for: {}", path.display());
601
602            // Start the watcher inside the task (if not already started by start_watching)
603            {
604                let mut w = watcher.lock().await;
605                if !w.is_started() {
606                    if let Err(e) = w.start(DEFAULT_FSW_DEBOUNCE_MS) {
607                        error!("❌ Failed to start file watcher: {}", e);
608                        return;
609                    }
610                } else {
611                    debug!("👀 File watcher already started (pre-started), skipping init");
612                }
613            }
614
615            // Event buffers - use HashSet to deduplicate
616            let mut files_to_index: HashSet<PathBuf> = HashSet::new();
617            let mut files_to_remove: HashSet<PathBuf> = HashSet::new();
618            let mut last_event_time = std::time::Instant::now();
619            let flush_duration = std::time::Duration::from_millis(FSW_BATCH_FLUSH_MS);
620
621            loop {
622                // Check if shutdown was requested
623                if cancel_token.is_cancelled() {
624                    info!("🛑 File watcher received shutdown signal, stopping...");
625                    break;
626                }
627
628                // Poll for new events
629                let events = watcher.lock().await.poll_events();
630                let now = std::time::Instant::now();
631
632                if !events.is_empty() {
633                    // Log which files are being buffered
634                    for event in &events {
635                        match event {
636                            FileEvent::Modified(p) => debug!("  📄 Buffered: {}", p.display()),
637                            FileEvent::Deleted(p) => {
638                                debug!("  🗑️  Buffered delete: {}", p.display())
639                            }
640                            FileEvent::Renamed(old, new) => debug!(
641                                "  📝 Buffered rename: {} -> {}",
642                                old.display(),
643                                new.display()
644                            ),
645                        }
646                    }
647                    debug!("📥 Buffered {} file event(s)", events.len());
648                    last_event_time = now;
649
650                    // Add events to buffers
651                    for event in events {
652                        match event {
653                            FileEvent::Modified(p) => {
654                                // If file was marked for removal, cancel that
655                                files_to_remove.remove(&p);
656                                files_to_index.insert(p);
657                            }
658                            FileEvent::Deleted(p) => {
659                                // If file was marked for indexing, cancel that
660                                files_to_index.remove(&p);
661                                files_to_remove.insert(p);
662                            }
663                            FileEvent::Renamed(old_p, new_p) => {
664                                // Remove old path, index new path
665                                files_to_index.remove(&old_p);
666                                files_to_remove.insert(old_p);
667                                files_to_remove.remove(&new_p);
668                                files_to_index.insert(new_p);
669                            }
670                        }
671                    }
672                }
673
674                // Check if we should flush the buffer
675                let has_buffered_events = !files_to_index.is_empty() || !files_to_remove.is_empty();
676                let time_since_last_event = now.duration_since(last_event_time);
677
678                if has_buffered_events && time_since_last_event >= flush_duration {
679                    // Flush the buffer
680                    let to_index: Vec<PathBuf> = files_to_index.drain().collect();
681                    let to_remove: Vec<PathBuf> = files_to_remove.drain().collect();
682
683                    info!(
684                        "📦 Flushing batch: {} to index, {} to remove",
685                        to_index.len(),
686                        to_remove.len()
687                    );
688
689                    // Process batch using shared stores
690                    if let Err(e) = Self::process_batch_with_stores(
691                        &path, &db_path, &stores, to_index, to_remove,
692                    )
693                    .await
694                    {
695                        error!("❌ Batch processing failed: {}", e);
696                    }
697
698                    // Reset timer
699                    last_event_time = now;
700                }
701
702                // Sleep to avoid busy-waiting, but wake up immediately on shutdown
703                tokio::select! {
704                    _ = tokio::time::sleep(tokio::time::Duration::from_millis(100)) => {}
705                    _ = cancel_token.cancelled() => {
706                        info!("🛑 File watcher received shutdown signal during sleep, stopping...");
707                        break;
708                    }
709                }
710            }
711
712            info!("✅ File watcher stopped cleanly");
713        });
714
715        info!("✅ File watcher background task spawned");
716
717        Ok(())
718    }
719
720    /// Process a batch of file events using shared stores.
721    /// This is more efficient than processing files one by one.
722    async fn process_batch_with_stores(
723        codebase_path: &Path,
724        db_path: &Path,
725        stores: &SharedStores,
726        files_to_index: Vec<PathBuf>,
727        files_to_remove: Vec<PathBuf>,
728    ) -> Result<()> {
729        use crate::output::set_quiet;
730
731        let start = std::time::Instant::now();
732
733        // Enable quiet mode during FSW batch processing to suppress verbose embedding output
734        set_quiet(true);
735
736        // First, remove deleted files
737        for file_path in &files_to_remove {
738            debug!("🗑️  Removing: {}", file_path.display());
739            if let Err(e) =
740                Self::remove_file_from_index_with_stores(codebase_path, db_path, stores, file_path)
741                    .await
742            {
743                warn!("⚠️  Failed to remove {}: {}", file_path.display(), e);
744            }
745
746            // Also handle directory deletion: on Windows, rm -rf of a directory may only
747            // produce a Remove event for the directory itself, not for individual files.
748            // Find all tracked files under this path prefix and remove them too.
749            {
750                use crate::cache::FileMetaStore;
751
752                // Load FileMetaStore from disk to query tracked files
753                let metadata_path = db_path.join("metadata.json");
754                if metadata_path.exists() {
755                    if let Ok(metadata_str) = std::fs::read_to_string(&metadata_path) {
756                        if let Ok(metadata) =
757                            serde_json::from_str::<serde_json::Value>(&metadata_str)
758                        {
759                            let dimensions =
760                                metadata["dimensions"].as_u64().unwrap_or(384) as usize;
761                            let model_name = metadata["model"].as_str().unwrap_or("minilm-l6-q");
762
763                            if let Ok(file_meta_store) =
764                                FileMetaStore::load_or_create(db_path, model_name, dimensions)
765                            {
766                                // Normalize the directory prefix for consistent matching
767                                // (tracked files are normalized to forward slashes)
768                                let dir_prefix = normalize_path(file_path);
769                                let dir_prefix_slash = if dir_prefix.ends_with('/') {
770                                    dir_prefix.clone()
771                                } else {
772                                    format!("{}/", dir_prefix)
773                                };
774
775                                let files_under_dir: Vec<String> = file_meta_store
776                                    .tracked_files()
777                                    .filter(|f| f.starts_with(&dir_prefix_slash))
778                                    .cloned()
779                                    .collect();
780
781                                if !files_under_dir.is_empty() {
782                                    info!(
783                                        "🗑️  Directory deleted: {} ({} files under it)",
784                                        file_path.display(),
785                                        files_under_dir.len()
786                                    );
787                                    for tracked_file in &files_under_dir {
788                                        let tracked_path = PathBuf::from(tracked_file);
789                                        if let Err(e) = Self::remove_file_from_index_with_stores(
790                                            codebase_path,
791                                            db_path,
792                                            stores,
793                                            &tracked_path,
794                                        )
795                                        .await
796                                        {
797                                            warn!(
798                                                "⚠️  Failed to remove {}: {}",
799                                                tracked_path.display(),
800                                                e
801                                            );
802                                        }
803                                    }
804                                }
805                            }
806                        }
807                    }
808                }
809            }
810        }
811
812        // Rebuild vector index after removals so deleted chunks are excluded from search results.
813        // index_single_file_with_stores already calls build_index() per file, but when a batch
814        // contains ONLY removals (no additions), the index would never be rebuilt without this.
815        if !files_to_remove.is_empty() {
816            let mut store = stores.vector_store.write().await;
817            store.build_index()?;
818        }
819
820        // Then, index modified/new files
821        for file_path in &files_to_index {
822            debug!("📄 Indexing: {}", file_path.display());
823            if let Err(e) =
824                Self::index_single_file_with_stores(codebase_path, db_path, stores, file_path).await
825            {
826                warn!("⚠️  Failed to index {}: {}", file_path.display(), e);
827            }
828        }
829
830        // Disable quiet mode after batch processing is complete
831        set_quiet(false);
832
833        let elapsed = start.elapsed();
834        info!(
835            "✅ Batch complete: {} indexed, {} removed in {:.2}s",
836            files_to_index.len(),
837            files_to_remove.len(),
838            elapsed.as_secs_f64()
839        );
840
841        Ok(())
842    }
843
844    /// Check if initial indexing is needed.
845    async fn needs_initial_indexing(path: &Path) -> Result<bool> {
846        // Check for DB_DIR_NAME directory (the only correct path)
847        let db_path = path.join(DB_DIR_NAME);
848        let meta_db_path = db_path.join(FILE_META_DB_NAME);
849
850        if !meta_db_path.exists() {
851            debug!(
852                "📂 File metadata database not found at: {}",
853                meta_db_path.display()
854            );
855            return Ok(true);
856        }
857
858        // Check if database is empty or corrupted
859        // This is a simplified check - in production you might want more sophisticated checks
860        Ok(false)
861    }
862
863    /// Perform initial full indexing.
864    #[allow(dead_code)]
865    async fn perform_initial_indexing(path: &Path) -> Result<()> {
866        info!("🔨 Performing full indexing (this may take a while)...");
867        let start = std::time::Instant::now();
868
869        // Call the index function from the parent module
870        // Parameters: path, dry_run, force, global, model
871        super::index(
872            Some(path.to_path_buf()),
873            false,
874            false,
875            false,
876            None,
877            CancellationToken::new(),
878        )
879        .await?;
880
881        let elapsed = start.elapsed();
882        info!(
883            "✅ Full indexing completed in {:.2}s",
884            elapsed.as_secs_f64()
885        );
886
887        Ok(())
888    }
889
890    /// Perform incremental index refresh.
891    async fn perform_incremental_refresh(path: &Path) -> Result<()> {
892        info!("🔄 Performing incremental index refresh...");
893        let start = std::time::Instant::now();
894
895        // Call the quiet index function from the parent module (no CLI output)
896        // For incremental refresh, we use force=false which enables incremental mode
897        super::index_quiet(Some(path.to_path_buf()), false, CancellationToken::new()).await?;
898
899        let elapsed = start.elapsed();
900        info!(
901            "✅ Incremental refresh completed in {:.2}s",
902            elapsed.as_secs_f64()
903        );
904
905        Ok(())
906    }
907
908    /// Index a single file (for FSW events).
909    /// This is much faster than a full incremental refresh.
910    async fn index_single_file(codebase_path: &Path, file_path: &Path) -> Result<()> {
911        use crate::cache::FileMetaStore;
912        use crate::chunker::{Chunker, SemanticChunker};
913        use crate::embed::EmbeddingService;
914        use crate::file::Language;
915        use crate::fts::FtsStore;
916        use crate::vectordb::VectorStore;
917
918        let db_path = codebase_path.join(DB_DIR_NAME);
919
920        // Check if file exists and is indexable
921        if !file_path.exists() {
922            debug!("File no longer exists, skipping: {}", file_path.display());
923            return Ok(());
924        }
925
926        let language = Language::from_path(file_path);
927        if !language.is_indexable() {
928            debug!("File not indexable, skipping: {}", file_path.display());
929            return Ok(());
930        }
931
932        // Read file content
933        let content = match std::fs::read_to_string(file_path) {
934            Ok(c) => c,
935            Err(e) => {
936                warn!("Failed to read file {}: {}", file_path.display(), e);
937                return Ok(());
938            }
939        };
940
941        // First, remove old chunks for this file
942        Self::remove_file_from_index(codebase_path, file_path).await?;
943
944        // Chunk the file
945        let chunker = SemanticChunker::new(100, 4000, 2);
946        let chunks = chunker.chunk_file(file_path, &content)?;
947
948        if chunks.is_empty() {
949            debug!("No chunks created for file: {}", file_path.display());
950            return Ok(());
951        }
952
953        debug!(
954            "Created {} chunks for file: {}",
955            chunks.len(),
956            file_path.display()
957        );
958
959        // Generate embeddings
960        let cache_dir = crate::constants::get_global_models_cache_dir()?;
961        let mut embedding_service =
962            EmbeddingService::with_cache_dir(ModelType::default(), Some(cache_dir.as_path()))?;
963        let embedded_chunks = embedding_service.embed_chunks(chunks)?;
964
965        // Load metadata to get dimensions
966        let metadata_path = db_path.join("metadata.json");
967        let metadata: serde_json::Value =
968            serde_json::from_str(&std::fs::read_to_string(&metadata_path)?)?;
969        let dimensions = metadata["dimensions"].as_u64().unwrap_or(384) as usize;
970
971        // Open stores
972        let mut store = VectorStore::new(&db_path, dimensions)?;
973        let mut fts_store = FtsStore::new_with_writer(&db_path)?;
974
975        // Insert chunks
976        let chunk_ids = store.insert_chunks_with_ids(embedded_chunks.clone())?;
977
978        // Rebuild the vector index after inserting new chunks
979        store.build_index()?;
980
981        // Add to FTS
982        for (chunk, chunk_id) in embedded_chunks.iter().zip(chunk_ids.iter()) {
983            let path_str = chunk.chunk.path.to_string();
984            let signature = chunk.chunk.signature.as_deref();
985            let kind = format!("{:?}", chunk.chunk.kind);
986            fts_store.add_chunk(*chunk_id, &chunk.chunk.content, &path_str, signature, &kind)?;
987        }
988        fts_store.commit()?;
989
990        // Update file metadata
991        let model_name = metadata["model"].as_str().unwrap_or("minilm-l6-q");
992        let mut file_meta_store = FileMetaStore::load_or_create(&db_path, model_name, dimensions)?;
993        file_meta_store.update_file(file_path, chunk_ids)?;
994        file_meta_store.save(&db_path)?;
995
996        info!(
997            "✅ Indexed {} ({} chunks)",
998            file_path.display(),
999            embedded_chunks.len()
1000        );
1001
1002        Ok(())
1003    }
1004
1005    /// Remove a file from the index (for FSW delete events).
1006    async fn remove_file_from_index(codebase_path: &Path, file_path: &Path) -> Result<()> {
1007        use crate::cache::FileMetaStore;
1008        use crate::fts::FtsStore;
1009        use crate::vectordb::VectorStore;
1010
1011        let db_path = codebase_path.join(DB_DIR_NAME);
1012
1013        // Load metadata to get dimensions and model
1014        let metadata_path = db_path.join("metadata.json");
1015        if !metadata_path.exists() {
1016            debug!("No metadata found, skipping removal");
1017            return Ok(());
1018        }
1019        let metadata: serde_json::Value =
1020            serde_json::from_str(&std::fs::read_to_string(&metadata_path)?)?;
1021        let dimensions = metadata["dimensions"].as_u64().unwrap_or(384) as usize;
1022        let model_name = metadata["model"].as_str().unwrap_or("minilm-l6-q");
1023
1024        // Load file metadata to get chunk IDs
1025        let mut file_meta_store = FileMetaStore::load_or_create(&db_path, model_name, dimensions)?;
1026
1027        // Get chunk IDs from file metadata directly (not check_file which reads from disk)
1028        // The file is already deleted, so we can't read mtime/size/hash
1029        let meta = file_meta_store.remove_file(file_path);
1030        let chunk_ids = match meta {
1031            Some(m) if !m.chunk_ids.is_empty() => m.chunk_ids,
1032            Some(_) => {
1033                debug!("No chunks to remove for file: {}", file_path.display());
1034                file_meta_store.save(&db_path)?;
1035                return Ok(());
1036            }
1037            None => {
1038                debug!("No metadata found for file: {}", file_path.display());
1039                return Ok(());
1040            }
1041        };
1042
1043        debug!(
1044            "Removing {} chunks for file: {}",
1045            chunk_ids.len(),
1046            file_path.display()
1047        );
1048
1049        // Open stores
1050        let mut store = VectorStore::new(&db_path, dimensions)?;
1051        let mut fts_store = FtsStore::new_with_writer(&db_path)?;
1052
1053        // Delete chunks from vector store and FTS
1054        for chunk_id in &chunk_ids {
1055            store.delete_chunks(&[*chunk_id])?;
1056            fts_store.delete_chunk(*chunk_id)?;
1057        }
1058
1059        // Rebuild vector index so deleted chunks are excluded from search results
1060        store.build_index()?;
1061        fts_store.commit()?;
1062
1063        // Save file metadata (remove_file was already called above)
1064        file_meta_store.save(&db_path)?;
1065
1066        info!(
1067            "✅ Removed {} chunks for {}",
1068            chunk_ids.len(),
1069            file_path.display()
1070        );
1071
1072        Ok(())
1073    }
1074
1075    /// Index a single file using shared stores (for FSW events).
1076    /// This version uses the shared stores to avoid LMDB conflicts.
1077    async fn index_single_file_with_stores(
1078        codebase_path: &Path,
1079        db_path: &Path,
1080        stores: &SharedStores,
1081        file_path: &Path,
1082    ) -> Result<()> {
1083        use crate::cache::FileMetaStore;
1084        use crate::chunker::{Chunker, SemanticChunker};
1085        use crate::embed::EmbeddingService;
1086        use crate::file::Language;
1087
1088        // Check if file exists and is indexable
1089        if !file_path.exists() {
1090            debug!("File no longer exists, skipping: {}", file_path.display());
1091            return Ok(());
1092        }
1093
1094        let language = Language::from_path(file_path);
1095        if !language.is_indexable() {
1096            debug!("File not indexable, skipping: {}", file_path.display());
1097            return Ok(());
1098        }
1099
1100        // Read file content
1101        let content = match std::fs::read_to_string(file_path) {
1102            Ok(c) => c,
1103            Err(e) => {
1104                warn!("Failed to read file {}: {}", file_path.display(), e);
1105                return Ok(());
1106            }
1107        };
1108
1109        // First, remove old chunks for this file
1110        Self::remove_file_from_index_with_stores(codebase_path, db_path, stores, file_path).await?;
1111
1112        // Chunk the file
1113        let chunker = SemanticChunker::new(100, 4000, 2);
1114        let chunks = chunker.chunk_file(file_path, &content)?;
1115
1116        if chunks.is_empty() {
1117            debug!("No chunks created for file: {}", file_path.display());
1118            return Ok(());
1119        }
1120
1121        debug!(
1122            "Created {} chunks for file: {}",
1123            chunks.len(),
1124            file_path.display()
1125        );
1126
1127        // Generate embeddings
1128        let cache_dir = crate::constants::get_global_models_cache_dir()?;
1129        let mut embedding_service =
1130            EmbeddingService::with_cache_dir(ModelType::default(), Some(cache_dir.as_path()))?;
1131        let embedded_chunks = embedding_service.embed_chunks(chunks)?;
1132
1133        // Load metadata to get model name
1134        let metadata_path = db_path.join("metadata.json");
1135        let metadata: serde_json::Value =
1136            serde_json::from_str(&std::fs::read_to_string(&metadata_path)?)?;
1137        let dimensions = metadata["dimensions"].as_u64().unwrap_or(384) as usize;
1138        let model_name = metadata["model"].as_str().unwrap_or("minilm-l6-q");
1139
1140        // Use shared stores with write lock
1141        let chunk_ids = {
1142            let mut store = stores.vector_store.write().await;
1143            let chunk_ids = store.insert_chunks_with_ids(embedded_chunks.clone())?;
1144            // Rebuild the vector index after inserting new chunks
1145            store.build_index()?;
1146            chunk_ids
1147        };
1148
1149        // Add to FTS with write lock
1150        {
1151            let mut fts_store = stores.fts_store.write().await;
1152            for (chunk, chunk_id) in embedded_chunks.iter().zip(chunk_ids.iter()) {
1153                let path_str = chunk.chunk.path.to_string();
1154                let signature = chunk.chunk.signature.as_deref();
1155                let kind = format!("{:?}", chunk.chunk.kind);
1156                fts_store.add_chunk(
1157                    *chunk_id,
1158                    &chunk.chunk.content,
1159                    &path_str,
1160                    signature,
1161                    &kind,
1162                )?;
1163            }
1164            fts_store.commit()?;
1165        }
1166
1167        // Update file metadata (separate store, not shared)
1168        let mut file_meta_store = FileMetaStore::load_or_create(db_path, model_name, dimensions)?;
1169        file_meta_store.update_file(file_path, chunk_ids)?;
1170        file_meta_store.save(db_path)?;
1171
1172        info!(
1173            "✅ Indexed {} ({} chunks)",
1174            file_path.display(),
1175            embedded_chunks.len()
1176        );
1177
1178        Ok(())
1179    }
1180
1181    /// Remove a file from the index using shared stores (for FSW delete events).
1182    /// This version uses the shared stores to avoid LMDB conflicts.
1183    async fn remove_file_from_index_with_stores(
1184        _codebase_path: &Path,
1185        db_path: &Path,
1186        stores: &SharedStores,
1187        file_path: &Path,
1188    ) -> Result<()> {
1189        use crate::cache::FileMetaStore;
1190
1191        // Load metadata to get dimensions and model
1192        let metadata_path = db_path.join("metadata.json");
1193        if !metadata_path.exists() {
1194            debug!("No metadata found, skipping removal");
1195            return Ok(());
1196        }
1197        let metadata: serde_json::Value =
1198            serde_json::from_str(&std::fs::read_to_string(&metadata_path)?)?;
1199        let dimensions = metadata["dimensions"].as_u64().unwrap_or(384) as usize;
1200        let model_name = metadata["model"].as_str().unwrap_or("minilm-l6-q");
1201
1202        // Load file metadata to get chunk IDs
1203        let mut file_meta_store = FileMetaStore::load_or_create(db_path, model_name, dimensions)?;
1204
1205        // Get chunk IDs from file metadata directly (not check_file which reads from disk)
1206        // The file is already deleted, so we can't read mtime/size/hash
1207        let meta = file_meta_store.remove_file(file_path);
1208        let chunk_ids = match meta {
1209            Some(m) if !m.chunk_ids.is_empty() => m.chunk_ids,
1210            Some(_) => {
1211                debug!("No chunks to remove for file: {}", file_path.display());
1212                file_meta_store.save(db_path)?;
1213                return Ok(());
1214            }
1215            None => {
1216                debug!("No metadata found for file: {}", file_path.display());
1217                return Ok(());
1218            }
1219        };
1220
1221        debug!(
1222            "Removing {} chunks for file: {}",
1223            chunk_ids.len(),
1224            file_path.display()
1225        );
1226
1227        // Delete chunks from vector store with write lock
1228        {
1229            let mut store = stores.vector_store.write().await;
1230            for chunk_id in &chunk_ids {
1231                store.delete_chunks(&[*chunk_id])?;
1232            }
1233        }
1234
1235        // Delete from FTS with write lock
1236        {
1237            let mut fts_store = stores.fts_store.write().await;
1238            for chunk_id in &chunk_ids {
1239                fts_store.delete_chunk(*chunk_id)?;
1240            }
1241            fts_store.commit()?;
1242        }
1243
1244        // Save file metadata (remove_file was already called above)
1245        file_meta_store.save(db_path)?;
1246
1247        info!(
1248            "✅ Removed {} chunks for {}",
1249            chunk_ids.len(),
1250            file_path.display()
1251        );
1252
1253        Ok(())
1254    }
1255}
1256
1257#[cfg(test)]
1258mod tests {
1259    #[allow(unused_imports)]
1260    use super::*;
1261
1262    #[tokio::test]
1263    async fn test_index_manager_creation() {
1264        // This test would require a test codebase with an existing index
1265        // For now, we just verify the struct can be created
1266        let temp_dir = std::env::temp_dir();
1267        let test_path = temp_dir.join("test_codesearch");
1268
1269        // Note: This will fail if test_path doesn't exist or isn't a valid codebase
1270        // In a real test, you'd set up a temporary directory with test files and index
1271        // The test expects to fail since we haven't set up a proper test codebase
1272        println!("Test path: {}", test_path.display());
1273        println!("Expected: Index manager creation will fail (no test codebase)");
1274    }
1275}