1use std::collections::HashMap;
6use std::path::Path;
7use std::sync::Arc;
8
9use dashmap::DashMap;
10
11use crate::chunker::{Chunker, DefaultChunker};
12use crate::config::{EngineConfig, WorkspaceConfig};
13use crate::embeddings::{create_embedding_text, Embedder, HashEmbedder};
14use crate::error::{Error, Result};
15use crate::ignore::FileWalker;
16use crate::search::{LexicalIndex, SearchQuery, SemanticIndex};
17use crate::storage::{DiskStorage, Storage, WorkspaceMetadata};
18use crate::types::{
19 Chunk, Document, DocumentId, IndexPhase, IndexProgress, Language, SearchResult, WorkspaceId,
20 WorkspaceStats,
21};
22
23pub type ProgressCallback = Box<dyn Fn(IndexProgress) + Send + Sync>;
25
26pub struct Engine {
28 config: EngineConfig,
29 storage: Arc<DiskStorage>,
30 chunker: Arc<DefaultChunker>,
31 embedder: Arc<dyn Embedder>,
32 workspaces: DashMap<WorkspaceId, WorkspaceState>,
33}
34
35struct WorkspaceState {
37 #[allow(dead_code)]
38 metadata: WorkspaceMetadata,
39 documents: Vec<Document>,
40 chunks: Vec<Chunk>,
41 lexical_index: Arc<LexicalIndex>,
42 semantic_index: Arc<SemanticIndex>,
43}
44
45impl Engine {
46 pub fn new(config: EngineConfig) -> Result<Self> {
48 let storage = Arc::new(DiskStorage::new(config.index_dir.clone())?);
49 let chunker = Arc::new(DefaultChunker::new(config.chunking.clone())?);
50 let embedder: Arc<dyn Embedder> = Arc::new(HashEmbedder::new(config.embedding.dimension));
51
52 Ok(Self {
53 config,
54 storage,
55 chunker,
56 embedder,
57 workspaces: DashMap::new(),
58 })
59 }
60
61 pub fn with_embedder<E: Embedder + 'static>(config: EngineConfig, embedder: E) -> Result<Self> {
63 let storage = Arc::new(DiskStorage::new(config.index_dir.clone())?);
64 let chunker = Arc::new(DefaultChunker::new(config.chunking.clone())?);
65
66 Ok(Self {
67 config,
68 storage,
69 chunker,
70 embedder: Arc::new(embedder),
71 workspaces: DashMap::new(),
72 })
73 }
74
75 pub fn config(&self) -> &EngineConfig {
77 &self.config
78 }
79
80 pub async fn index_workspace(
82 &self,
83 workspace_config: WorkspaceConfig,
84 progress: Option<ProgressCallback>,
85 ) -> Result<WorkspaceId> {
86 self.index_workspace_with_id(workspace_config, None, progress).await
87 }
88
89 pub async fn index_workspace_with_id(
93 &self,
94 workspace_config: WorkspaceConfig,
95 workspace_id: Option<WorkspaceId>,
96 progress: Option<ProgressCallback>,
97 ) -> Result<WorkspaceId> {
98 let workspace_id = workspace_id.unwrap_or_else(WorkspaceId::new);
99 let root_path = workspace_config.root_path.clone();
100
101 if !root_path.exists() {
102 return Err(Error::DirectoryNotFound(root_path));
103 }
104
105 self.storage.init_workspace(&workspace_id, &root_path)?;
107
108 if let Some(ref cb) = progress {
110 cb(IndexProgress {
111 phase: IndexPhase::Scanning,
112 processed: 0,
113 total: 0,
114 current_file: None,
115 eta_seconds: None,
116 });
117 }
118
119 let walker = FileWalker::new(root_path.clone(), self.config.ignore.clone())?;
121 let files = walker.walk()?;
122 let total_files = files.len();
123
124 if let Some(ref cb) = progress {
126 cb(IndexProgress {
127 phase: IndexPhase::Parsing,
128 processed: 0,
129 total: total_files,
130 current_file: None,
131 eta_seconds: None,
132 });
133 }
134
135 let mut documents = Vec::new();
136 let mut chunks = Vec::new();
137
138 for (i, file) in files.iter().enumerate() {
139 if let Some(ref cb) = progress {
140 cb(IndexProgress {
141 phase: IndexPhase::Parsing,
142 processed: i,
143 total: total_files,
144 current_file: Some(file.clone()),
145 eta_seconds: None,
146 });
147 }
148
149 if let Ok((doc, file_chunks)) = self.process_file(file, &root_path) {
150 documents.push(doc);
151 chunks.extend(file_chunks);
152 }
153 }
154
155 if let Some(ref cb) = progress {
157 cb(IndexProgress {
158 phase: IndexPhase::Embedding,
159 processed: 0,
160 total: chunks.len(),
161 current_file: None,
162 eta_seconds: None,
163 });
164 }
165
166 let mut embeddings = Vec::new();
167 let batch_size = self.config.embedding.batch_size;
168
169 for (batch_idx, batch) in chunks.chunks(batch_size).enumerate() {
170 if let Some(ref cb) = progress {
171 cb(IndexProgress {
172 phase: IndexPhase::Embedding,
173 processed: batch_idx * batch_size,
174 total: chunks.len(),
175 current_file: None,
176 eta_seconds: None,
177 });
178 }
179
180 let texts: Vec<String> = batch.iter().map(create_embedding_text).collect();
181 let text_refs: Vec<&str> = texts.iter().map(|s| s.as_str()).collect();
182
183 if let Ok(batch_embeddings) = self.embedder.embed_batch(&text_refs) {
184 for (chunk, embedding) in batch.iter().zip(batch_embeddings) {
185 embeddings.push((chunk.id.0.to_string(), embedding));
186 }
187 }
188 }
189
190 if let Some(ref cb) = progress {
192 cb(IndexProgress {
193 phase: IndexPhase::Indexing,
194 processed: 0,
195 total: 1,
196 current_file: None,
197 eta_seconds: None,
198 });
199 }
200
201 let lexical_path = self.storage.tantivy_index_path(&workspace_id);
203 let lexical_index = LexicalIndex::create(&lexical_path, self.config.search.clone())?;
204
205 let doc_map: HashMap<String, &Document> =
206 documents.iter().map(|d| (d.id.0.to_string(), d)).collect();
207 lexical_index.index_chunks(&chunks, &doc_map)?;
208
209 let mut semantic_index =
211 SemanticIndex::new(self.embedder.dimension(), self.config.search.clone())?;
212 semantic_index.add_embeddings(&embeddings)?;
213
214 if let Some(ref cb) = progress {
216 cb(IndexProgress {
217 phase: IndexPhase::Persisting,
218 processed: 0,
219 total: 1,
220 current_file: None,
221 eta_seconds: None,
222 });
223 }
224
225 self.storage.save_documents(&workspace_id, &documents)?;
226 self.storage.save_chunks(&workspace_id, &chunks)?;
227 self.storage.save_embeddings(&workspace_id, &embeddings)?;
228
229 let vector_path = self.storage.vector_index_path(&workspace_id);
230 semantic_index.save(&vector_path)?;
231
232 let mut metadata = self.storage.load_workspace_metadata(&workspace_id)?;
234 for doc in &documents {
235 let chunk_count = chunks.iter().filter(|c| c.document_id == doc.id).count();
236 metadata.record_document(
237 doc.relative_path.clone(),
238 doc.id.clone(),
239 doc.content_hash.clone(),
240 doc.size_bytes,
241 doc.language,
242 chunk_count,
243 );
244 }
245 self.storage
246 .save_workspace_metadata(&workspace_id, &metadata)?;
247
248 self.workspaces.insert(
250 workspace_id.clone(),
251 WorkspaceState {
252 metadata,
253 documents,
254 chunks,
255 lexical_index: Arc::new(lexical_index),
256 semantic_index: Arc::new(semantic_index),
257 },
258 );
259
260 if let Some(ref cb) = progress {
261 cb(IndexProgress {
262 phase: IndexPhase::Complete,
263 processed: total_files,
264 total: total_files,
265 current_file: None,
266 eta_seconds: Some(0.0),
267 });
268 }
269
270 Ok(workspace_id)
271 }
272
273 pub fn load_workspace(&self, workspace_id: &WorkspaceId) -> Result<()> {
275 if !self.storage.workspace_exists(workspace_id) {
276 return Err(Error::WorkspaceNotFound(workspace_id.clone()));
277 }
278
279 let metadata = self.storage.load_workspace_metadata(workspace_id)?;
280 let documents = self.storage.load_documents(workspace_id)?;
281 let chunks = self.storage.load_chunks(workspace_id)?;
282
283 let lexical_path = self.storage.tantivy_index_path(workspace_id);
284 let lexical_index = LexicalIndex::open(&lexical_path, self.config.search.clone())?;
285
286 let vector_path = self.storage.vector_index_path(workspace_id);
287 let semantic_index = SemanticIndex::load(
288 &vector_path,
289 self.embedder.dimension(),
290 self.config.search.clone(),
291 )?;
292
293 self.workspaces.insert(
294 workspace_id.clone(),
295 WorkspaceState {
296 metadata,
297 documents,
298 chunks,
299 lexical_index: Arc::new(lexical_index),
300 semantic_index: Arc::new(semantic_index),
301 },
302 );
303
304 Ok(())
305 }
306
307 pub fn search(
309 &self,
310 workspace_id: &WorkspaceId,
311 query: SearchQuery,
312 ) -> Result<Vec<SearchResult>> {
313 let state = self
314 .workspaces
315 .get(workspace_id)
316 .ok_or_else(|| Error::WorkspaceNotFound(workspace_id.clone()))?;
317
318 let chunk_map: HashMap<String, &Chunk> = state
320 .chunks
321 .iter()
322 .map(|c| (c.id.0.to_string(), c))
323 .collect();
324
325 let doc_map: HashMap<String, &Document> = state
326 .documents
327 .iter()
328 .map(|d| (d.id.0.to_string(), d))
329 .collect();
330
331 match query.mode {
332 crate::config::SearchMode::Lexical => {
333 state.lexical_index.search_index(&query, &chunk_map, &doc_map)
334 }
335 crate::config::SearchMode::Semantic => {
336 state.semantic_index.search_with_embedder(
337 &query,
338 self.embedder.as_ref(),
339 &chunk_map,
340 &doc_map,
341 )
342 }
343 crate::config::SearchMode::Hybrid => {
344 let lexical_results =
345 state.lexical_index.search_index(&query, &chunk_map, &doc_map)?;
346 let semantic_results = state.semantic_index.search_with_embedder(
347 &query,
348 self.embedder.as_ref(),
349 &chunk_map,
350 &doc_map,
351 )?;
352
353 let weights = vec![
355 self.config.search.lexical_weight,
356 self.config.search.semantic_weight,
357 ];
358 Ok(crate::search::merge_results(
359 vec![lexical_results, semantic_results],
360 &weights,
361 query.limit,
362 ))
363 }
364 }
365 }
366
367 pub fn search_text(
369 &self,
370 workspace_id: &WorkspaceId,
371 text: &str,
372 ) -> Result<Vec<SearchResult>> {
373 let query = SearchQuery::new(text)
374 .limit(self.config.search.default_limit)
375 .min_score(self.config.search.min_score);
376
377 self.search(workspace_id, query)
378 }
379
380 pub fn delete_workspace(&self, workspace_id: &WorkspaceId) -> Result<()> {
382 self.workspaces.remove(workspace_id);
383 self.storage.delete_workspace(workspace_id)
384 }
385
386 pub fn list_workspaces(&self) -> Result<Vec<WorkspaceId>> {
388 self.storage.list_workspaces()
389 }
390
391 pub fn get_workspace_stats(&self, workspace_id: &WorkspaceId) -> Result<WorkspaceStats> {
393 self.storage.get_workspace_stats(workspace_id)
394 }
395
396 pub fn find_workspace_by_path(&self, root_path: &Path) -> Result<Option<WorkspaceId>> {
399 let workspaces = self.list_workspaces()?;
400 let normalized_path = root_path.to_string_lossy().to_lowercase().replace('\\', "/");
401
402 for ws_id in workspaces {
403 if let Ok(stats) = self.get_workspace_stats(&ws_id) {
404 let indexed_path = stats.root_path.to_string_lossy().to_lowercase().replace('\\', "/");
405 if indexed_path == normalized_path {
406 return Ok(Some(ws_id));
407 }
408 }
409 }
410 Ok(None)
411 }
412
413 pub async fn index_or_reindex_workspace(
416 &self,
417 workspace_config: WorkspaceConfig,
418 progress: Option<ProgressCallback>,
419 ) -> Result<WorkspaceId> {
420 let root_path = workspace_config.root_path.clone();
421
422 let existing_id = self.find_workspace_by_path(&root_path)?;
424
425 if let Some(ws_id) = existing_id {
426 self.delete_workspace(&ws_id)?;
428 self.index_workspace_with_id(workspace_config, Some(ws_id), progress).await
430 } else {
431 self.index_workspace(workspace_config, progress).await
433 }
434 }
435
436 fn process_file(&self, path: &Path, root: &Path) -> Result<(Document, Vec<Chunk>)> {
437 let content = std::fs::read_to_string(path)?;
438 let metadata = std::fs::metadata(path)?;
439 let hash = blake3::hash(content.as_bytes()).to_hex().to_string();
440
441 let extension = path.extension().and_then(|e| e.to_str()).unwrap_or("");
442 let language = Language::from_extension(extension);
443 let relative_path = path.strip_prefix(root).unwrap_or(path).to_path_buf();
444
445 let document = Document {
446 id: DocumentId::new(),
447 relative_path,
448 absolute_path: path.to_path_buf(),
449 language,
450 content_hash: hash,
451 size_bytes: metadata.len(),
452 modified_at: metadata
453 .modified()
454 .ok()
455 .map(chrono::DateTime::from)
456 .unwrap_or_else(chrono::Utc::now),
457 };
458
459 let chunks = self.chunker.chunk(&document, &content)?;
460 Ok((document, chunks))
461 }
462}
463
464unsafe impl Send for Engine {}
465unsafe impl Sync for Engine {}