directory_indexer/indexing/
engine.rs1use log::{error, info, warn};
2use std::path::PathBuf;
3
4use crate::{
5 config::Config,
6 embedding::EmbeddingProvider,
7 error::{IndexerError, Result},
8 indexing::files::{FileInfo, FileScanner},
9 storage::{qdrant::VectorPoint, FileRecord, QdrantStore, SqliteStore},
10 utils::{calculate_file_hash, chunk_text, normalize_path},
11};
12
13pub struct IndexingEngine {
14 config: Config,
15 sqlite_store: SqliteStore,
16 vector_store: QdrantStore,
17 embedding_provider: Box<dyn EmbeddingProvider>,
18}
19
20#[derive(Debug)]
21pub struct IndexingStats {
22 pub directories_processed: usize,
23 pub files_processed: usize,
24 pub files_skipped: usize,
25 pub files_errored: usize,
26 pub chunks_created: usize,
27}
28
29impl IndexingEngine {
30 pub async fn new(
31 config: Config,
32 sqlite_store: SqliteStore,
33 vector_store: QdrantStore,
34 embedding_provider: Box<dyn EmbeddingProvider>,
35 ) -> Result<Self> {
36 Ok(Self {
37 config,
38 sqlite_store,
39 vector_store,
40 embedding_provider,
41 })
42 }
43
44 pub async fn validate_state_consistency(&self) -> Result<()> {
47 info!("Validating state consistency between SQLite and Qdrant");
48
49 let (_, file_count, _) = self.sqlite_store.get_stats()?;
51
52 let collection_info = self.vector_store.get_collection_info().await?;
54
55 info!(
56 "State check: SQLite has {file_count} files, Qdrant has {} vectors",
57 collection_info.points_count
58 );
59
60 if file_count > 0 && collection_info.points_count == 0 {
62 warn!(
63 "State mismatch detected: SQLite has {file_count} indexed files but Qdrant collection is empty. Clearing SQLite state to force re-indexing."
64 );
65
66 self.sqlite_store.clear_all_files()?;
68
69 info!("SQLite state cleared. Files will be re-indexed.");
70 } else {
71 info!("State consistency validated: no mismatch detected");
72 }
73
74 Ok(())
75 }
76
77 pub async fn index_directories(&self, paths: Vec<PathBuf>) -> Result<IndexingStats> {
78 info!("Starting indexing for {len} directories", len = paths.len());
79
80 let mut stats = IndexingStats {
81 directories_processed: 0,
82 files_processed: 0,
83 files_skipped: 0,
84 files_errored: 0,
85 chunks_created: 0,
86 };
87
88 for path in paths {
89 match self.index_directory(&path).await {
90 Ok(dir_stats) => {
91 stats.directories_processed += 1;
92 stats.files_processed += dir_stats.files_processed;
93 stats.files_skipped += dir_stats.files_skipped;
94 stats.files_errored += dir_stats.files_errored;
95 stats.chunks_created += dir_stats.chunks_created;
96 }
97 Err(e) => {
98 error!("Failed to index directory {path:?}: {e}");
99 stats.files_errored += 1;
100 }
101 }
102 }
103
104 info!("Indexing completed: {stats:?}");
105 Ok(stats)
106 }
107
108 async fn index_directory(&self, path: &PathBuf) -> Result<IndexingStats> {
109 info!("Indexing directory: {path:?}");
110
111 let mut stats = IndexingStats {
112 directories_processed: 0,
113 files_processed: 0,
114 files_skipped: 0,
115 files_errored: 0,
116 chunks_created: 0,
117 };
118
119 self.sqlite_store.add_directory(&path.to_string_lossy())?;
121
122 let scanner = FileScanner::new();
124 let files = scanner.scan_directory(path).await?;
125
126 info!(
127 "Found {len} files to process in {path:?}",
128 len = files.len()
129 );
130
131 for file_info in files {
133 match self.process_file(&file_info).await {
134 Ok(chunks_count) => {
135 stats.files_processed += 1;
136 stats.chunks_created += chunks_count;
137 }
138 Err(e) => {
139 error!("Failed to process file {:?}: {e}", file_info.path);
140 stats.files_errored += 1;
141 }
142 }
143 }
144
145 self.sqlite_store
147 .update_directory_status(&path.to_string_lossy(), "completed")?;
148
149 stats.directories_processed = 1;
150 Ok(stats)
151 }
152
153 async fn process_file(&self, file_info: &FileInfo) -> Result<usize> {
154 info!("Processing file: {:?}", file_info.path);
155
156 let file_hash = calculate_file_hash(&file_info.path)?;
158
159 let normalized_path = normalize_path(&file_info.path)?;
161 if let Some(existing) = self.sqlite_store.get_file_by_path(&normalized_path)? {
162 if existing.hash == file_hash
163 && existing.modified_time == (file_info.modified_time as i64)
164 {
165 info!("File unchanged, skipping: {:?}", file_info.path);
166 return Ok(0);
167 }
168
169 self.vector_store
171 .delete_points_by_file(&normalized_path)
172 .await?;
173 }
174
175 let content = tokio::fs::read_to_string(&file_info.path)
177 .await
178 .map_err(|e| {
179 IndexerError::file_processing(format!(
180 "Failed to read file {:?}: {e}",
181 file_info.path
182 ))
183 })?;
184
185 let chunks = chunk_text(
186 &content,
187 self.config.indexing.chunk_size,
188 self.config.indexing.overlap,
189 );
190
191 if chunks.is_empty() {
192 info!("No chunks generated for file: {:?}", file_info.path);
193 return Ok(0);
194 }
195
196 let mut vector_points = Vec::new();
198 let batch_size = 10; for (batch_num, chunk_batch) in chunks.chunks(batch_size).enumerate() {
201 let chunk_futures: Vec<_> = chunk_batch
203 .iter()
204 .enumerate()
205 .map(|(batch_idx, chunk_content)| {
206 let embedding_provider = &self.embedding_provider;
207 let chunk_content = chunk_content.clone();
208 let global_chunk_id = batch_num * batch_size + batch_idx; async move {
210 match embedding_provider.generate_embedding(chunk_content).await {
211 Ok(embedding) => Some((global_chunk_id, embedding)),
212 Err(e) => {
213 error!(
214 "Failed to generate embedding for chunk {global_chunk_id}: {e}"
215 );
216 None
217 }
218 }
219 }
220 })
221 .collect();
222
223 let results = futures::future::join_all(chunk_futures).await;
225
226 for (chunk_id, embedding) in results.into_iter().flatten() {
228 let point_id = uuid::Uuid::new_v4().to_string();
229 let point = VectorPoint {
230 id: point_id,
231 vector: embedding,
232 file_path: normalized_path.clone(),
233 chunk_id,
234 parent_directories: file_info.parent_dirs.clone(),
235 };
236 vector_points.push(point);
237 }
238 }
239
240 if !vector_points.is_empty() {
242 self.vector_store.upsert_points(vector_points).await?;
243 }
244
245 let file_record = FileRecord {
247 id: 0, path: normalized_path.clone(),
249 size: file_info.size as i64,
250 modified_time: file_info.modified_time as i64,
251 hash: file_hash,
252 parent_dirs: file_info.parent_dirs.clone(),
253 chunks_json: Some(serde_json::json!(chunks)),
254 errors_json: None,
255 };
256
257 self.sqlite_store.add_file(&file_record)?;
258
259 info!(
260 "Successfully processed file: {:?} ({len} chunks)",
261 file_info.path,
262 len = chunks.len()
263 );
264 Ok(chunks.len())
265 }
266
267 pub async fn update_file(&self, file_path: &PathBuf) -> Result<()> {
268 info!("Updating file: {file_path:?}");
269
270 warn!("File update not yet implemented");
277 Ok(())
278 }
279
280 pub async fn remove_file(&self, file_path: &PathBuf) -> Result<()> {
281 info!("Removing file: {file_path:?}");
282
283 warn!("File removal not yet implemented");
289 Ok(())
290 }
291}