use super::RagClient;
use crate::rag::embedding::EmbeddingProvider;
use crate::rag::indexer::{CodeChunk, FileWalker};
use crate::rag::types::{ChunkMetadata, IndexResponse};
use anyhow::{Context, Result};
use rayon::prelude::*;
use rmcp::{Peer, RoleServer, model::ProgressNotificationParam, model::ProgressToken};
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Instant;
use tokio_util::sync::CancellationToken;
macro_rules! check_cancelled {
($cancel_token:expr) => {
if $cancel_token.is_cancelled() {
tracing::info!("Indexing operation cancelled");
anyhow::bail!("Indexing was cancelled");
}
};
}
struct EmbeddingResult {
embeddings: Vec<Vec<f32>>,
successful_chunks: Vec<CodeChunk>,
errors: Vec<String>,
}
async fn generate_embeddings_with_cancellation(
client: &RagClient,
chunks: &[CodeChunk],
cancel_token: &CancellationToken,
peer: &Option<Peer<RoleServer>>,
progress_token: &Option<ProgressToken>,
progress_start: f64,
progress_end: f64,
) -> Result<EmbeddingResult> {
let batch_size = client.config.embedding.batch_size;
let timeout_secs = client.config.embedding.timeout_secs;
let check_interval = if client.config.embedding.cancellation_check_interval > 0 {
client.config.embedding.cancellation_check_interval
} else {
batch_size };
let mut all_embeddings = Vec::with_capacity(chunks.len());
let mut successful_chunks = Vec::with_capacity(chunks.len());
let mut errors = Vec::new();
let total_batches = chunks.len().div_ceil(batch_size);
let mut chunks_processed = 0;
for (batch_idx, chunk_batch) in chunks.chunks(batch_size).enumerate() {
if cancel_token.is_cancelled() {
tracing::info!(
"Embedding generation cancelled after {} chunks",
chunks_processed
);
anyhow::bail!("Indexing was cancelled");
}
let mut batch_embeddings = Vec::new();
let mut batch_successful_chunks = Vec::new();
for sub_batch in chunk_batch.chunks(check_interval) {
if cancel_token.is_cancelled() {
tracing::info!(
"Embedding generation cancelled during batch {} after {} chunks",
batch_idx,
chunks_processed
);
anyhow::bail!("Indexing was cancelled");
}
let texts: Vec<String> = sub_batch.iter().map(|c| c.content.clone()).collect();
let provider = client.embedding_provider.clone();
let embed_future = tokio::task::spawn_blocking(move || provider.embed_batch(texts));
match tokio::time::timeout(std::time::Duration::from_secs(timeout_secs), embed_future)
.await
{
Ok(Ok(Ok(embeddings))) => {
batch_embeddings.extend(embeddings);
batch_successful_chunks.extend(sub_batch.iter().cloned());
chunks_processed += sub_batch.len();
}
Ok(Ok(Err(e))) => {
errors.push(format!(
"Failed to generate embeddings for sub-batch: {}",
e
));
}
Ok(Err(e)) => {
errors.push(format!("Embedding task panicked: {}", e));
}
Err(_) => {
errors.push(format!(
"Embedding generation timed out after {} seconds",
timeout_secs
));
}
}
}
all_embeddings.extend(batch_embeddings);
successful_chunks.extend(batch_successful_chunks);
if let (Some(peer), Some(token)) = (peer, progress_token) {
let progress = progress_start
+ ((batch_idx + 1) as f64 / total_batches as f64) * (progress_end - progress_start);
let _ = peer
.notify_progress(ProgressNotificationParam {
progress_token: token.clone(),
progress,
total: Some(100.0),
message: Some(format!(
"Generating embeddings... {}/{} batches ({} chunks)",
batch_idx + 1,
total_batches,
chunks_processed
)),
})
.await;
}
}
Ok(EmbeddingResult {
embeddings: all_embeddings,
successful_chunks,
errors,
})
}
#[allow(clippy::too_many_arguments)]
pub async fn do_index(
client: &RagClient,
path: String,
project: Option<String>,
include_patterns: Vec<String>,
exclude_patterns: Vec<String>,
max_file_size: usize,
peer: Option<Peer<RoleServer>>,
progress_token: Option<ProgressToken>,
cancel_token: CancellationToken,
) -> Result<IndexResponse> {
let start = Instant::now();
let mut errors = Vec::new();
if let (Some(peer), Some(token)) = (&peer, &progress_token) {
let _ = peer
.notify_progress(ProgressNotificationParam {
progress_token: token.clone(),
progress: 0.0,
total: Some(100.0),
message: Some("Starting file walk...".into()),
})
.await;
}
let cancelled_flag = Arc::new(AtomicBool::new(false));
let cancelled_flag_clone = cancelled_flag.clone();
let cancel_token_clone = cancel_token.clone();
let _cancel_watcher = tokio::spawn(async move {
cancel_token_clone.cancelled().await;
cancelled_flag_clone.store(true, Ordering::Relaxed);
tracing::debug!("Cancellation flag set for file walker");
});
let walker = FileWalker::new(&path, max_file_size)
.with_project(project.clone())
.with_patterns(include_patterns.clone(), exclude_patterns.clone())
.with_cancellation_flag(cancelled_flag);
let files = tokio::task::spawn_blocking(move || walker.walk())
.await
.context("Failed to spawn file walker task")?
.context("Failed to walk directory")?;
let files_indexed = files.len();
check_cancelled!(cancel_token);
if let (Some(peer), Some(token)) = (&peer, &progress_token) {
let _ = peer
.notify_progress(ProgressNotificationParam {
progress_token: token.clone(),
progress: 20.0,
total: Some(100.0),
message: Some(format!("Found {} files, chunking...", files_indexed)),
})
.await;
}
let chunker = client.chunker.clone();
let all_chunks: Vec<_> = files
.par_iter()
.flat_map(|file| chunker.chunk_file(file))
.collect();
let chunks_created = all_chunks.len();
if let (Some(peer), Some(token)) = (&peer, &progress_token) {
let _ = peer
.notify_progress(ProgressNotificationParam {
progress_token: token.clone(),
progress: 40.0,
total: Some(100.0),
message: Some(format!(
"Created {} chunks, generating embeddings...",
chunks_created
)),
})
.await;
}
if all_chunks.is_empty() {
return Ok(IndexResponse {
mode: crate::rag::types::IndexingMode::Full,
files_indexed: 0,
chunks_created: 0,
embeddings_generated: 0,
duration_ms: start.elapsed().as_millis() as u64,
errors: vec!["No code chunks found to index".to_string()],
files_updated: 0,
files_removed: 0,
});
}
let embed_result = generate_embeddings_with_cancellation(
client,
&all_chunks,
&cancel_token,
&peer,
&progress_token,
40.0,
80.0,
)
.await?;
let all_embeddings = embed_result.embeddings;
let successful_chunks = embed_result.successful_chunks;
errors.extend(embed_result.errors);
let embeddings_generated = all_embeddings.len();
if let (Some(peer), Some(token)) = (&peer, &progress_token) {
let _ = peer
.notify_progress(ProgressNotificationParam {
progress_token: token.clone(),
progress: 85.0,
total: Some(100.0),
message: Some(format!(
"Storing {} embeddings in database...",
embeddings_generated
)),
})
.await;
}
let metadata: Vec<ChunkMetadata> = successful_chunks
.iter()
.map(|c| c.metadata.clone())
.collect();
let contents: Vec<String> = successful_chunks
.iter()
.map(|c| c.content.clone())
.collect();
debug_assert_eq!(
all_embeddings.len(),
metadata.len(),
"Embeddings and metadata count mismatch"
);
debug_assert_eq!(
all_embeddings.len(),
contents.len(),
"Embeddings and contents count mismatch"
);
check_cancelled!(cancel_token);
if !all_embeddings.is_empty() {
client
.vector_db
.store_embeddings(all_embeddings, metadata, contents, &path)
.await
.context("Failed to store embeddings")?;
}
if let (Some(peer), Some(token)) = (&peer, &progress_token) {
let _ = peer
.notify_progress(ProgressNotificationParam {
progress_token: token.clone(),
progress: 95.0,
total: Some(100.0),
message: Some("Saving cache...".into()),
})
.await;
}
let file_hashes: HashMap<String, String> = files
.iter()
.map(|f| (f.relative_path.clone(), f.hash.clone()))
.collect();
let mut cache = client.hash_cache.write().await;
cache.update_root(path.clone(), file_hashes);
if let Err(e) = cache.save(&client.cache_path) {
tracing::warn!("Failed to save hash cache: {}", e);
}
drop(cache);
if let (Some(peer), Some(token)) = (&peer, &progress_token) {
let _ = peer
.notify_progress(ProgressNotificationParam {
progress_token: token.clone(),
progress: 98.0,
total: Some(100.0),
message: Some("Flushing index to disk...".into()),
})
.await;
}
client
.vector_db
.flush()
.await
.map_err(|e| anyhow::anyhow!("Failed to flush index to disk: {}", e))?;
{
let mut cache = client.hash_cache.write().await;
cache.clear_dirty(&path);
if let Err(e) = cache.save(&client.cache_path) {
tracing::warn!("Failed to clear dirty flag after full index: {}", e);
}
tracing::debug!("Cleared dirty flag early (full index) for: {}", path);
}
if let (Some(peer), Some(token)) = (&peer, &progress_token) {
let _ = peer
.notify_progress(ProgressNotificationParam {
progress_token: token.clone(),
progress: 100.0,
total: Some(100.0),
message: Some("Indexing complete!".into()),
})
.await;
}
Ok(IndexResponse {
mode: crate::rag::types::IndexingMode::Full,
files_indexed,
chunks_created,
embeddings_generated,
duration_ms: start.elapsed().as_millis() as u64,
errors,
files_updated: 0,
files_removed: 0,
})
}
#[allow(clippy::too_many_arguments)]
pub async fn do_incremental_update(
client: &RagClient,
path: String,
project: Option<String>,
include_patterns: Vec<String>,
exclude_patterns: Vec<String>,
max_file_size: usize,
peer: Option<Peer<RoleServer>>,
progress_token: Option<ProgressToken>,
cancel_token: CancellationToken,
) -> Result<IndexResponse> {
let start = Instant::now();
if let (Some(peer), Some(token)) = (&peer, &progress_token) {
let _ = peer
.notify_progress(ProgressNotificationParam {
progress_token: token.clone(),
progress: 0.0,
total: Some(100.0),
message: Some("Checking for changes...".into()),
})
.await;
}
let cache = client.hash_cache.read().await;
let existing_hashes = cache.get_root(&path).cloned().unwrap_or_default();
drop(cache);
if let (Some(peer), Some(token)) = (&peer, &progress_token) {
let _ = peer
.notify_progress(ProgressNotificationParam {
progress_token: token.clone(),
progress: 10.0,
total: Some(100.0),
message: Some(format!(
"Found {} cached files, scanning directory...",
existing_hashes.len()
)),
})
.await;
}
let cancelled_flag = Arc::new(AtomicBool::new(false));
let cancelled_flag_clone = cancelled_flag.clone();
let cancel_token_clone = cancel_token.clone();
let _cancel_watcher = tokio::spawn(async move {
cancel_token_clone.cancelled().await;
cancelled_flag_clone.store(true, Ordering::Relaxed);
tracing::debug!("Cancellation flag set for file walker");
});
let walker = FileWalker::new(&path, max_file_size)
.with_project(project.clone())
.with_patterns(include_patterns.clone(), exclude_patterns.clone())
.with_cancellation_flag(cancelled_flag);
let current_files = tokio::task::spawn_blocking(move || walker.walk())
.await
.context("Failed to spawn file walker task")?
.context("Failed to walk directory")?;
check_cancelled!(cancel_token);
let mut files_added = 0;
let mut files_updated = 0;
let mut files_removed = 0;
let mut chunks_modified = 0;
if let (Some(peer), Some(token)) = (&peer, &progress_token) {
let _ = peer
.notify_progress(ProgressNotificationParam {
progress_token: token.clone(),
progress: 30.0,
total: Some(100.0),
message: Some(format!(
"Found {} files, comparing with cache...",
current_files.len()
)),
})
.await;
}
let mut new_hashes = HashMap::with_capacity(current_files.len());
let mut files_to_index = Vec::with_capacity(current_files.len());
for file in current_files {
new_hashes.insert(file.relative_path.clone(), file.hash.clone());
match existing_hashes.get(&file.relative_path) {
None => {
files_added += 1;
files_to_index.push(file);
}
Some(old_hash) if old_hash != &file.hash => {
if let Err(e) = client.vector_db.delete_by_file(&file.relative_path).await {
tracing::warn!("Failed to delete old embeddings: {}", e);
}
files_updated += 1;
files_to_index.push(file);
}
_ => {
}
}
}
for old_file in existing_hashes.keys() {
if !new_hashes.contains_key(old_file) {
files_removed += 1;
if let Err(e) = client.vector_db.delete_by_file(old_file).await {
tracing::warn!("Failed to delete embeddings for removed file: {}", e);
}
}
}
if let (Some(peer), Some(token)) = (&peer, &progress_token) {
let _ = peer
.notify_progress(ProgressNotificationParam {
progress_token: token.clone(),
progress: 50.0,
total: Some(100.0),
message: Some(format!(
"Processing {} changed files...",
files_to_index.len()
)),
})
.await;
}
let (embeddings_generated, embed_errors) = if !files_to_index.is_empty() {
let chunker = client.chunker.clone();
let all_chunks: Vec<_> = files_to_index
.par_iter()
.flat_map(|file| chunker.chunk_file(file))
.collect();
chunks_modified = all_chunks.len();
if let (Some(peer), Some(token)) = (&peer, &progress_token) {
let _ = peer
.notify_progress(ProgressNotificationParam {
progress_token: token.clone(),
progress: 60.0,
total: Some(100.0),
message: Some(format!(
"Created {} chunks, generating embeddings...",
chunks_modified
)),
})
.await;
}
let embed_result = generate_embeddings_with_cancellation(
client,
&all_chunks,
&cancel_token,
&peer,
&progress_token,
60.0,
85.0,
)
.await?;
let all_embeddings = embed_result.embeddings;
let successful_chunks = embed_result.successful_chunks;
if let (Some(peer), Some(token)) = (&peer, &progress_token) {
let _ = peer
.notify_progress(ProgressNotificationParam {
progress_token: token.clone(),
progress: 90.0,
total: Some(100.0),
message: Some(format!("Storing {} embeddings...", all_embeddings.len())),
})
.await;
}
check_cancelled!(cancel_token);
let metadata: Vec<ChunkMetadata> = successful_chunks
.iter()
.map(|c| c.metadata.clone())
.collect();
let contents: Vec<String> = successful_chunks
.iter()
.map(|c| c.content.clone())
.collect();
if !all_embeddings.is_empty() {
client
.vector_db
.store_embeddings(all_embeddings.clone(), metadata, contents, &path)
.await
.context("Failed to store embeddings")?;
}
(all_embeddings.len(), embed_result.errors)
} else {
(0, vec![])
};
for err in embed_errors {
tracing::warn!("Embedding error during incremental update: {}", err);
}
if let (Some(peer), Some(token)) = (&peer, &progress_token) {
let _ = peer
.notify_progress(ProgressNotificationParam {
progress_token: token.clone(),
progress: 95.0,
total: Some(100.0),
message: Some("Saving cache...".into()),
})
.await;
}
let mut cache = client.hash_cache.write().await;
cache.update_root(path.clone(), new_hashes);
if let Err(e) = cache.save(&client.cache_path) {
tracing::warn!("Failed to save hash cache: {}", e);
}
drop(cache);
if let (Some(peer), Some(token)) = (&peer, &progress_token) {
let _ = peer
.notify_progress(ProgressNotificationParam {
progress_token: token.clone(),
progress: 98.0,
total: Some(100.0),
message: Some("Flushing index to disk...".into()),
})
.await;
}
client
.vector_db
.flush()
.await
.context("Failed to flush index to disk")?;
{
let mut cache = client.hash_cache.write().await;
cache.clear_dirty(&path);
if let Err(e) = cache.save(&client.cache_path) {
tracing::warn!("Failed to clear dirty flag after incremental update: {}", e);
}
tracing::debug!("Cleared dirty flag early (incremental) for: {}", path);
}
if let (Some(peer), Some(token)) = (&peer, &progress_token) {
let _ = peer
.notify_progress(ProgressNotificationParam {
progress_token: token.clone(),
progress: 100.0,
total: Some(100.0),
message: Some("Incremental update complete!".into()),
})
.await;
}
Ok(IndexResponse {
mode: crate::rag::types::IndexingMode::Incremental,
files_indexed: files_added,
chunks_created: chunks_modified,
embeddings_generated,
duration_ms: start.elapsed().as_millis() as u64,
errors: vec![],
files_updated,
files_removed,
})
}
#[allow(clippy::too_many_arguments)]
pub async fn do_index_smart(
client: &RagClient,
path: String,
project: Option<String>,
include_patterns: Vec<String>,
exclude_patterns: Vec<String>,
max_file_size: usize,
peer: Option<Peer<RoleServer>>,
progress_token: Option<ProgressToken>,
cancel_token: CancellationToken,
) -> Result<IndexResponse> {
use super::IndexLockResult;
let lock_result = client.try_acquire_index_lock(&path).await?;
match lock_result {
IndexLockResult::WaitForResult(mut receiver) => {
tracing::info!(
"Waiting for existing indexing operation in this process to complete for: {}",
path
);
if let (Some(peer), Some(token)) = (&peer, &progress_token) {
let _ = peer
.notify_progress(ProgressNotificationParam {
progress_token: token.clone(),
progress: 0.0,
total: Some(100.0),
message: Some(
"Waiting for existing indexing operation to complete...".into(),
),
})
.await;
}
match receiver.recv().await {
Ok(result) => {
tracing::info!("Received result from existing indexing operation");
Ok(result)
}
Err(e) => {
Err(anyhow::anyhow!(
"Indexing operation failed or was cancelled: {}",
e
))
}
}
}
IndexLockResult::WaitForFilesystemLock(normalized_path) => {
tracing::info!(
"Another process is indexing {} - waiting for filesystem lock to be released",
normalized_path
);
if let (Some(peer), Some(token)) = (&peer, &progress_token) {
let _ = peer
.notify_progress(ProgressNotificationParam {
progress_token: token.clone(),
progress: 0.0,
total: Some(100.0),
message: Some("Waiting for another process to finish indexing...".into()),
})
.await;
}
use super::FsLockGuard;
use std::time::Duration;
let path_for_lock = normalized_path.clone();
let fs_lock_result = tokio::task::spawn_blocking(move || {
FsLockGuard::acquire_blocking(&path_for_lock, Duration::from_secs(30 * 60))
})
.await
.context("Filesystem lock blocking task panicked")??;
match fs_lock_result {
Some(_lock) => {
tracing::info!(
"Other process finished indexing {} - performing incremental check",
normalized_path
);
drop(_lock);
Ok(IndexResponse {
mode: crate::rag::types::IndexingMode::Incremental,
files_indexed: 0,
chunks_created: 0,
embeddings_generated: 0,
duration_ms: 0,
errors: vec![],
files_updated: 0,
files_removed: 0,
})
}
None => {
Err(anyhow::anyhow!(
"Timeout waiting for another process to finish indexing {} (30 minutes)",
normalized_path
))
}
}
}
IndexLockResult::Acquired(lock) => {
let result = do_index_smart_inner(
client,
path.clone(),
project,
include_patterns,
exclude_patterns,
max_file_size,
peer,
progress_token,
cancel_token,
)
.await;
match &result {
Ok(response) => {
lock.broadcast_result(response);
}
Err(e) => {
tracing::error!("Indexing failed for {}: {}", path, e);
let error_response = IndexResponse {
mode: crate::rag::types::IndexingMode::Full,
files_indexed: 0,
chunks_created: 0,
embeddings_generated: 0,
duration_ms: 0,
errors: vec![format!("Indexing failed: {}", e)],
files_updated: 0,
files_removed: 0,
};
lock.broadcast_result(&error_response);
}
}
lock.release().await;
result
}
}
}
const STALE_DIRTY_FLAG_TIMEOUT_SECS: u64 = 2 * 60 * 60;
#[derive(Debug)]
enum DirtyFlagValidation {
TrulyCorrupted { reason: String },
StaleFlag { age_secs: u64 },
IndexAppearsComplete {
cached_files: usize,
indexed_files: usize,
},
}
async fn validate_dirty_flag(
client: &RagClient,
normalized_path: &str,
) -> Result<DirtyFlagValidation> {
let (dirty_info_data, cached_files_count) = {
let cache = client.hash_cache.read().await;
let dirty_info = cache.get_dirty_info(normalized_path).cloned();
let cached_files_count = cache
.get_root(normalized_path)
.map(|h| h.len())
.unwrap_or(0);
(dirty_info, cached_files_count)
};
if let Some(ref info) = dirty_info_data {
let age = info.age_secs();
if info.is_stale(STALE_DIRTY_FLAG_TIMEOUT_SECS) {
return Ok(DirtyFlagValidation::StaleFlag { age_secs: age });
}
}
let indexed_count = client
.vector_db
.count_by_root_path(normalized_path)
.await
.unwrap_or(0);
if cached_files_count > 0 && indexed_count == 0 {
return Ok(DirtyFlagValidation::TrulyCorrupted {
reason: format!(
"Cache has {} files but vector DB has 0 embeddings",
cached_files_count
),
});
}
if cached_files_count == 0 && indexed_count == 0 {
return Ok(DirtyFlagValidation::StaleFlag {
age_secs: dirty_info_data.as_ref().map(|i| i.age_secs()).unwrap_or(0),
});
}
let indexed_files = client
.vector_db
.get_indexed_files(normalized_path)
.await
.unwrap_or_default();
let indexed_files_count = indexed_files.len();
if indexed_files_count > 0 && indexed_files_count >= cached_files_count * 8 / 10 {
return Ok(DirtyFlagValidation::IndexAppearsComplete {
cached_files: cached_files_count,
indexed_files: indexed_files_count,
});
}
Ok(DirtyFlagValidation::TrulyCorrupted {
reason: format!(
"Cached {} files but only {} files indexed ({}%)",
cached_files_count,
indexed_files_count,
if cached_files_count > 0 {
indexed_files_count * 100 / cached_files_count
} else {
0
}
),
})
}
#[allow(clippy::too_many_arguments)]
async fn do_index_smart_inner(
client: &RagClient,
path: String,
project: Option<String>,
include_patterns: Vec<String>,
exclude_patterns: Vec<String>,
max_file_size: usize,
peer: Option<Peer<RoleServer>>,
progress_token: Option<ProgressToken>,
cancel_token: CancellationToken,
) -> Result<IndexResponse> {
let normalized_path = RagClient::normalize_path(&path)?;
let is_dirty = {
let cache = client.hash_cache.read().await;
cache.is_dirty(&normalized_path)
};
let mut force_full_reindex = false;
if is_dirty {
tracing::info!(
"Index for '{}' is marked as dirty. Validating dirty flag...",
normalized_path
);
let validation = validate_dirty_flag(client, &normalized_path).await?;
match validation {
DirtyFlagValidation::TrulyCorrupted { reason } => {
tracing::warn!(
"Index for '{}' is truly corrupted: {}. Clearing and performing full reindex.",
normalized_path,
reason
);
if let (Some(peer), Some(token)) = (&peer, &progress_token) {
let _ = peer
.notify_progress(ProgressNotificationParam {
progress_token: token.clone(),
progress: 0.0,
total: Some(100.0),
message: Some(format!(
"Corrupted index detected ({}), clearing...",
reason
)),
})
.await;
}
if let Err(e) = clear_path_data(client, &normalized_path).await {
tracing::error!(
"Failed to clear corrupted index data for '{}': {}",
normalized_path,
e
);
}
let mut cache = client.hash_cache.write().await;
cache.remove_root(&normalized_path);
if let Err(e) = cache.save(&client.cache_path) {
tracing::warn!("Failed to save cache after clearing dirty state: {}", e);
}
drop(cache);
force_full_reindex = true;
}
DirtyFlagValidation::StaleFlag { age_secs } => {
tracing::info!(
"Dirty flag for '{}' is stale (age: {} seconds). Clearing flag and proceeding with incremental update.",
normalized_path,
age_secs
);
if let (Some(peer), Some(token)) = (&peer, &progress_token) {
let _ = peer
.notify_progress(ProgressNotificationParam {
progress_token: token.clone(),
progress: 0.0,
total: Some(100.0),
message: Some(format!(
"Stale dirty flag detected (age: {}s), clearing...",
age_secs
)),
})
.await;
}
let mut cache = client.hash_cache.write().await;
cache.clear_dirty(&normalized_path);
if let Err(e) = cache.save(&client.cache_path) {
tracing::warn!(
"Failed to save cache after clearing stale dirty flag: {}",
e
);
}
drop(cache);
}
DirtyFlagValidation::IndexAppearsComplete {
cached_files,
indexed_files,
} => {
tracing::info!(
"Index for '{}' appears complete despite dirty flag ({} cached files, {} indexed files). Clearing flag and proceeding with incremental update.",
normalized_path,
cached_files,
indexed_files
);
if let (Some(peer), Some(token)) = (&peer, &progress_token) {
let _ = peer
.notify_progress(ProgressNotificationParam {
progress_token: token.clone(),
progress: 0.0,
total: Some(100.0),
message: Some(
"Index appears complete, clearing stale dirty flag...".into(),
),
})
.await;
}
let mut cache = client.hash_cache.write().await;
cache.clear_dirty(&normalized_path);
if let Err(e) = cache.save(&client.cache_path) {
tracing::warn!("Failed to save cache after clearing dirty flag: {}", e);
}
drop(cache);
}
}
}
{
let mut cache = client.hash_cache.write().await;
cache.mark_dirty(&normalized_path);
if let Err(e) = cache.save(&client.cache_path) {
tracing::error!("Failed to save dirty flag: {}", e);
anyhow::bail!("Failed to mark index as dirty before indexing: {}", e);
}
tracing::debug!("Marked index as dirty for: {}", normalized_path);
}
let cache = client.hash_cache.read().await;
let has_existing_index = cache.get_root(&normalized_path).is_some();
drop(cache);
let result = if has_existing_index && !force_full_reindex {
tracing::info!(
"Existing index found for '{}' (normalized: '{}'), performing incremental update",
path,
normalized_path
);
do_incremental_update(
client,
normalized_path.clone(),
project,
include_patterns,
exclude_patterns,
max_file_size,
peer,
progress_token,
cancel_token,
)
.await
} else {
tracing::info!(
"No existing index found for '{}' (normalized: '{}') or force_full_reindex={}, performing full index",
path,
normalized_path,
force_full_reindex
);
do_index(
client,
normalized_path.clone(),
project,
include_patterns,
exclude_patterns,
max_file_size,
peer,
progress_token,
cancel_token,
)
.await
};
match &result {
Ok(_) => {
let mut cache = client.hash_cache.write().await;
cache.clear_dirty(&normalized_path);
if let Err(e) = cache.save(&client.cache_path) {
tracing::warn!(
"Failed to clear dirty flag after successful indexing: {}",
e
);
}
tracing::debug!("Cleared dirty flag for: {}", normalized_path);
}
Err(e) => {
tracing::warn!(
"Indexing failed or was cancelled for '{}', dirty flag remains set: {}",
normalized_path,
e
);
}
}
result
}
async fn clear_path_data(client: &RagClient, normalized_path: &str) -> Result<()> {
let cache = client.hash_cache.read().await;
let file_paths: Vec<String> = cache
.get_root(normalized_path)
.map(|hashes| hashes.keys().cloned().collect())
.unwrap_or_default();
drop(cache);
for file_path in file_paths {
if let Err(e) = client.vector_db.delete_by_file(&file_path).await {
tracing::warn!(
"Failed to delete embeddings for file '{}': {}",
file_path,
e
);
}
}
tracing::info!("Cleared indexed data for path: {}", normalized_path);
Ok(())
}
#[cfg(test)]
mod tests;