Skip to main content

directory_indexer/indexing/
engine.rs

1use log::{error, info, warn};
2use std::path::PathBuf;
3
4use crate::{
5    config::Config,
6    embedding::EmbeddingProvider,
7    error::{IndexerError, Result},
8    indexing::files::{FileInfo, FileScanner},
9    storage::{qdrant::VectorPoint, FileRecord, QdrantStore, SqliteStore},
10    utils::{calculate_file_hash, chunk_text, normalize_path},
11};
12
13pub struct IndexingEngine {
14    config: Config,
15    sqlite_store: SqliteStore,
16    vector_store: QdrantStore,
17    embedding_provider: Box<dyn EmbeddingProvider>,
18}
19
20#[derive(Debug)]
21pub struct IndexingStats {
22    pub directories_processed: usize,
23    pub files_processed: usize,
24    pub files_skipped: usize,
25    pub files_errored: usize,
26    pub chunks_created: usize,
27}
28
29impl IndexingEngine {
30    pub async fn new(
31        config: Config,
32        sqlite_store: SqliteStore,
33        vector_store: QdrantStore,
34        embedding_provider: Box<dyn EmbeddingProvider>,
35    ) -> Result<Self> {
36        Ok(Self {
37            config,
38            sqlite_store,
39            vector_store,
40            embedding_provider,
41        })
42    }
43
44    /// Validates consistency between SQLite metadata and Qdrant vectors
45    /// If SQLite claims files are indexed but Qdrant collection is empty, clears SQLite state
46    pub async fn validate_state_consistency(&self) -> Result<()> {
47        info!("Validating state consistency between SQLite and Qdrant");
48
49        // Get SQLite stats to see if we have indexed files
50        let (_, file_count, _) = self.sqlite_store.get_stats()?;
51
52        // Get Qdrant collection info to see if we have vectors
53        let collection_info = self.vector_store.get_collection_info().await?;
54
55        info!(
56            "State check: SQLite has {file_count} files, Qdrant has {} vectors",
57            collection_info.points_count
58        );
59
60        // Check for state mismatch: SQLite has files but Qdrant has no vectors
61        if file_count > 0 && collection_info.points_count == 0 {
62            warn!(
63                "State mismatch detected: SQLite has {file_count} indexed files but Qdrant collection is empty. Clearing SQLite state to force re-indexing."
64            );
65
66            // Clear SQLite tracking state to force re-indexing
67            self.sqlite_store.clear_all_files()?;
68
69            info!("SQLite state cleared. Files will be re-indexed.");
70        } else {
71            info!("State consistency validated: no mismatch detected");
72        }
73
74        Ok(())
75    }
76
77    pub async fn index_directories(&self, paths: Vec<PathBuf>) -> Result<IndexingStats> {
78        info!("Starting indexing for {len} directories", len = paths.len());
79
80        let mut stats = IndexingStats {
81            directories_processed: 0,
82            files_processed: 0,
83            files_skipped: 0,
84            files_errored: 0,
85            chunks_created: 0,
86        };
87
88        for path in paths {
89            match self.index_directory(&path).await {
90                Ok(dir_stats) => {
91                    stats.directories_processed += 1;
92                    stats.files_processed += dir_stats.files_processed;
93                    stats.files_skipped += dir_stats.files_skipped;
94                    stats.files_errored += dir_stats.files_errored;
95                    stats.chunks_created += dir_stats.chunks_created;
96                }
97                Err(e) => {
98                    error!("Failed to index directory {path:?}: {e}");
99                    stats.files_errored += 1;
100                }
101            }
102        }
103
104        info!("Indexing completed: {stats:?}");
105        Ok(stats)
106    }
107
108    async fn index_directory(&self, path: &PathBuf) -> Result<IndexingStats> {
109        info!("Indexing directory: {path:?}");
110
111        let mut stats = IndexingStats {
112            directories_processed: 0,
113            files_processed: 0,
114            files_skipped: 0,
115            files_errored: 0,
116            chunks_created: 0,
117        };
118
119        // Add directory to SQLite for tracking (normalization happens in add_directory)
120        self.sqlite_store.add_directory(&path.to_string_lossy())?;
121
122        // Scan directory for files
123        let scanner = FileScanner::new();
124        let files = scanner.scan_directory(path).await?;
125
126        info!(
127            "Found {len} files to process in {path:?}",
128            len = files.len()
129        );
130
131        // Process each file
132        for file_info in files {
133            match self.process_file(&file_info).await {
134                Ok(chunks_count) => {
135                    stats.files_processed += 1;
136                    stats.chunks_created += chunks_count;
137                }
138                Err(e) => {
139                    error!("Failed to process file {:?}: {e}", file_info.path);
140                    stats.files_errored += 1;
141                }
142            }
143        }
144
145        // Update directory status (normalization happens in update_directory_status)
146        self.sqlite_store
147            .update_directory_status(&path.to_string_lossy(), "completed")?;
148
149        stats.directories_processed = 1;
150        Ok(stats)
151    }
152
153    async fn process_file(&self, file_info: &FileInfo) -> Result<usize> {
154        info!("Processing file: {:?}", file_info.path);
155
156        // Calculate file hash
157        let file_hash = calculate_file_hash(&file_info.path)?;
158
159        // Check if file already exists and is up to date (using normalized path for lookup)
160        let normalized_path = normalize_path(&file_info.path)?;
161        if let Some(existing) = self.sqlite_store.get_file_by_path(&normalized_path)? {
162            if existing.hash == file_hash
163                && existing.modified_time == (file_info.modified_time as i64)
164            {
165                info!("File unchanged, skipping: {:?}", file_info.path);
166                return Ok(0);
167            }
168
169            // File changed, remove old data (using normalized path)
170            self.vector_store
171                .delete_points_by_file(&normalized_path)
172                .await?;
173        }
174
175        // Extract and chunk content
176        let content = tokio::fs::read_to_string(&file_info.path)
177            .await
178            .map_err(|e| {
179                IndexerError::file_processing(format!(
180                    "Failed to read file {:?}: {e}",
181                    file_info.path
182                ))
183            })?;
184
185        let chunks = chunk_text(
186            &content,
187            self.config.indexing.chunk_size,
188            self.config.indexing.overlap,
189        );
190
191        if chunks.is_empty() {
192            info!("No chunks generated for file: {:?}", file_info.path);
193            return Ok(0);
194        }
195
196        // Generate embeddings for each chunk in batches to avoid overwhelming the service
197        let mut vector_points = Vec::new();
198        let batch_size = 10; // Process 10 chunks concurrently at a time
199
200        for (batch_num, chunk_batch) in chunks.chunks(batch_size).enumerate() {
201            // Create futures for current batch
202            let chunk_futures: Vec<_> = chunk_batch
203                .iter()
204                .enumerate()
205                .map(|(batch_idx, chunk_content)| {
206                    let embedding_provider = &self.embedding_provider;
207                    let chunk_content = chunk_content.clone();
208                    let global_chunk_id = batch_num * batch_size + batch_idx; // Calculate global chunk ID
209                    async move {
210                        match embedding_provider.generate_embedding(chunk_content).await {
211                            Ok(embedding) => Some((global_chunk_id, embedding)),
212                            Err(e) => {
213                                error!(
214                                    "Failed to generate embedding for chunk {global_chunk_id}: {e}"
215                                );
216                                None
217                            }
218                        }
219                    }
220                })
221                .collect();
222
223            // Execute current batch concurrently
224            let results = futures::future::join_all(chunk_futures).await;
225
226            // Process batch results
227            for (chunk_id, embedding) in results.into_iter().flatten() {
228                let point_id = uuid::Uuid::new_v4().to_string();
229                let point = VectorPoint {
230                    id: point_id,
231                    vector: embedding,
232                    file_path: normalized_path.clone(),
233                    chunk_id,
234                    parent_directories: file_info.parent_dirs.clone(),
235                };
236                vector_points.push(point);
237            }
238        }
239
240        // Store vectors in Qdrant
241        if !vector_points.is_empty() {
242            self.vector_store.upsert_points(vector_points).await?;
243        }
244
245        // Store file record in SQLite (using normalized path)
246        let file_record = FileRecord {
247            id: 0, // Will be set by database
248            path: normalized_path.clone(),
249            size: file_info.size as i64,
250            modified_time: file_info.modified_time as i64,
251            hash: file_hash,
252            parent_dirs: file_info.parent_dirs.clone(),
253            chunks_json: Some(serde_json::json!(chunks)),
254            errors_json: None,
255        };
256
257        self.sqlite_store.add_file(&file_record)?;
258
259        info!(
260            "Successfully processed file: {:?} ({len} chunks)",
261            file_info.path,
262            len = chunks.len()
263        );
264        Ok(chunks.len())
265    }
266
267    pub async fn update_file(&self, file_path: &PathBuf) -> Result<()> {
268        info!("Updating file: {file_path:?}");
269
270        // TODO: Implement file update logic
271        // This would include:
272        // 1. Remove old chunks from vector store
273        // 2. Re-process the file
274        // 3. Update SQLite and Qdrant
275
276        warn!("File update not yet implemented");
277        Ok(())
278    }
279
280    pub async fn remove_file(&self, file_path: &PathBuf) -> Result<()> {
281        info!("Removing file: {file_path:?}");
282
283        // TODO: Implement file removal logic
284        // This would include:
285        // 1. Remove from vector store
286        // 2. Remove from SQLite
287
288        warn!("File removal not yet implemented");
289        Ok(())
290    }
291}