1mod helpers;
2mod policy;
3mod reconcile;
4mod status;
5mod store;
6use std::collections::{BTreeMap, HashSet};
7use std::path::{Path, PathBuf};
8use std::time::Instant;
9
10pub(crate) use helpers::*;
11pub(crate) use policy::*;
12pub(crate) use reconcile::*;
13use rusqlite::types::Value;
14use rusqlite::{Connection, OptionalExtension, params, params_from_iter};
15use serde::Serialize;
16use sha2::{Digest, Sha256};
17pub(crate) use status::*;
18pub(crate) use store::*;
19
20use crate::index::now_ms;
21use crate::language::Language;
22
23pub const HASH_MODEL_ID: &str = "embedding-hash";
24pub const FASTEMBED_MODEL_ID: &str = "fastembed-all-minilm-l6-v2";
25pub const FASTEMBED_DISPLAY_MODEL: &str = "sentence-transformers/all-MiniLM-L6-v2";
26pub const HASH_EMBEDDING_DIM: usize = 384;
27pub const FASTEMBED_EMBEDDING_DIM: usize = 384;
28pub const MODEL2VEC_MODEL_ID: &str = "model2vec-potion-retrieval-32m";
32pub const MODEL2VEC_DISPLAY_MODEL: &str = "minishlab/potion-retrieval-32M";
33pub const MODEL2VEC_HF_REPO: &str = "minishlab/potion-retrieval-32M";
34pub const MODEL2VEC_EMBEDDING_DIM: usize = 512;
35pub const MODEL2VEC_MISSING_FEATURE_MESSAGE: &str =
36 "Model2Vec backend requested, but this binary was built without Model2Vec support.\nRebuild \
37 with default features enabled:\n cargo install rag-rat";
38pub const FASTEMBED_MISSING_FEATURE_MESSAGE: &str =
39 "FastEmbed backend requested, but this binary was built without default FastEmbed \
40 support.\nRebuild with default features enabled:\n cargo install rag-rat";
41const ACTIVE_EMBEDDING_MODEL_META: &str = "active_embedding_model";
42const ACTIVE_EMBEDDING_MODEL_VERSION_META: &str = "embedding_active_model_version";
43const LAST_EMBEDDING_RECONCILE_STARTED_META: &str = "last_embedding_reconcile_started_at_ms";
44const LAST_EMBEDDING_RECONCILE_FINISHED_META: &str = "last_embedding_reconcile_finished_at_ms";
45const DEFAULT_BATCH_SIZE: usize = 64;
46pub const DEFAULT_MAX_EMBEDDING_CHARS: usize = 4_000;
47const MIN_EMBEDDING_CHARS: usize = 80;
48pub const EMBEDDING_TEXT_VERSION: &str = "embedding-text-v2";
49const LEGACY_MODEL_IDS: &[&str] = &["embedding-small"];
50#[cfg(feature = "fastembed")]
51const FASTEMBED_HF_CACHE_REPO_DIR: &str = "models--Qdrant--all-MiniLM-L6-v2-onnx";
52
53pub trait Embedder {
54 fn model_id(&self) -> &str;
55 fn dim(&self) -> usize;
56 fn embed_batch(&self, texts: &[String]) -> anyhow::Result<Vec<Vec<f32>>>;
57}
58
59pub struct HashEmbedder;
60
61impl Embedder for HashEmbedder {
62 fn model_id(&self) -> &str {
63 HASH_MODEL_ID
64 }
65
66 fn dim(&self) -> usize {
67 HASH_EMBEDDING_DIM
68 }
69
70 fn embed_batch(&self, texts: &[String]) -> anyhow::Result<Vec<Vec<f32>>> {
71 Ok(texts.iter().map(|text| hash_embed_text(text, HASH_EMBEDDING_DIM)).collect())
72 }
73}
74
75#[cfg(test)]
76pub struct MockEmbedder {
77 model_id: String,
78 dim: usize,
79}
80
81#[cfg(test)]
82impl MockEmbedder {
83 pub fn new(model_id: impl Into<String>, dim: usize) -> Self {
84 Self { model_id: model_id.into(), dim }
85 }
86}
87
88#[cfg(test)]
89impl Embedder for MockEmbedder {
90 fn model_id(&self) -> &str {
91 &self.model_id
92 }
93
94 fn dim(&self) -> usize {
95 self.dim
96 }
97
98 fn embed_batch(&self, texts: &[String]) -> anyhow::Result<Vec<Vec<f32>>> {
99 Ok(texts.iter().map(|text| hash_embed_text(text, self.dim)).collect())
100 }
101}
102
103#[cfg(feature = "fastembed")]
104pub struct FastEmbedEmbedder {
105 model: std::sync::Mutex<fastembed::TextEmbedding>,
106}
107
108#[cfg(feature = "fastembed")]
109impl FastEmbedEmbedder {
110 pub fn new(intra_threads: Option<usize>) -> anyhow::Result<Self> {
111 use fastembed::{EmbeddingModel, InitOptions, TextEmbedding};
112 let mut options = InitOptions::new(EmbeddingModel::AllMiniLML6V2)
113 .with_cache_dir(fastembed_cache_dir())
114 .with_show_download_progress(true);
115 if let Some(threads) = intra_threads.filter(|threads| *threads > 0) {
120 options = options.with_intra_threads(threads);
121 }
122 Ok(Self { model: std::sync::Mutex::new(TextEmbedding::try_new(options)?) })
123 }
124}
125
126#[cfg(feature = "fastembed")]
127impl Embedder for FastEmbedEmbedder {
128 fn model_id(&self) -> &str {
129 FASTEMBED_MODEL_ID
130 }
131
132 fn dim(&self) -> usize {
133 FASTEMBED_EMBEDDING_DIM
134 }
135
136 fn embed_batch(&self, texts: &[String]) -> anyhow::Result<Vec<Vec<f32>>> {
137 let documents = texts.iter().map(String::as_str).collect::<Vec<_>>();
138 let mut model =
139 self.model.lock().map_err(|_| anyhow::anyhow!("fastembed model lock poisoned"))?;
140 model.embed(documents, None)
141 }
142}
143
144#[cfg(feature = "model2vec")]
145pub struct Model2VecEmbedder {
146 model: model2vec_rs::model::StaticModel,
147}
148
149#[cfg(feature = "model2vec")]
150impl Model2VecEmbedder {
151 pub fn new() -> anyhow::Result<Self> {
152 let model = model2vec_rs::model::StaticModel::from_pretrained(
155 MODEL2VEC_HF_REPO,
156 None,
157 Some(true),
158 None,
159 )
160 .map_err(|err| anyhow::anyhow!("failed to load Model2Vec model: {err}"))?;
161 Ok(Self { model })
162 }
163}
164
165#[cfg(feature = "model2vec")]
166impl Embedder for Model2VecEmbedder {
167 fn model_id(&self) -> &str {
168 MODEL2VEC_MODEL_ID
169 }
170
171 fn dim(&self) -> usize {
172 MODEL2VEC_EMBEDDING_DIM
173 }
174
175 fn embed_batch(&self, texts: &[String]) -> anyhow::Result<Vec<Vec<f32>>> {
176 Ok(self.model.encode(texts))
177 }
178}
179
180#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
181pub enum ArtifactStatus {
182 Current,
183 Missing,
184 Stale,
185 Failed,
186 Blocked,
187 Disabled,
188}
189
190impl ArtifactStatus {
191 pub fn as_str(self) -> &'static str {
192 match self {
193 Self::Current => "Current",
194 Self::Missing => "Missing",
195 Self::Stale => "Stale",
196 Self::Failed => "Failed",
197 Self::Blocked => "Blocked",
198 Self::Disabled => "Disabled",
199 }
200 }
201}
202
203#[derive(Debug, Clone, Serialize)]
204pub struct LocalAiStatus {
205 pub embedding: CapabilityStatus,
206 pub artifacts: ArtifactCounts,
207 pub fastembed: FastEmbedOperationalStatus,
208 pub last_reconcile: Option<LastReconcileStatus>,
209}
210
211#[derive(Debug, Clone, Serialize)]
212pub struct CapabilityStatus {
213 pub capability: String,
214 pub model_id: String,
215 pub state: String,
216 pub installed: bool,
217 pub disabled: bool,
218 pub current_artifacts: u64,
219 pub stale_artifacts: u64,
220 pub failed_artifacts: u64,
221 pub blocked_artifacts: u64,
222 pub message: Option<String>,
223}
224
225#[derive(Debug, Clone, Serialize)]
226pub struct FastEmbedOperationalStatus {
227 pub backend: String,
228 pub build_feature_enabled: bool,
229 pub model_id: String,
230 pub model: String,
231 pub dim: usize,
232 pub cache: String,
233 pub installed: bool,
234 pub active: bool,
235 pub status: String,
236 pub current_embeddings: u64,
237 pub eligible_embeddings: u64,
238 pub skipped_embeddings: u64,
239 pub stale_embeddings: u64,
240 pub missing_embeddings: u64,
241 pub failed_embeddings: u64,
242 pub failed_retryable_embeddings: u64,
243 pub failed_waiting_embeddings: u64,
244 pub message: Option<String>,
245 pub next: Option<String>,
246}
247
248#[derive(Debug, Clone, Serialize)]
249pub struct ArtifactCounts {
250 pub total_chunks: u64,
251 pub eligible_chunks: u64,
252 pub skipped_chunks: u64,
253 pub current: u64,
254 pub missing: u64,
255 pub stale: u64,
256 pub failed: u64,
257 pub blocked: u64,
258 pub disabled: u64,
259}
260
261#[derive(Debug, Clone, Serialize)]
262pub struct LastReconcileStatus {
263 pub started_at_ms: i64,
264 pub finished_at_ms: Option<i64>,
265 pub batch_size: u64,
266 pub processed_chunks: u64,
267 pub embeddings_written: u64,
268 pub blocked_chunks: u64,
269 pub elapsed_ms: u64,
270 pub input_chars: u64,
271 pub chunks_per_sec: f64,
272 pub chars_per_sec: f64,
273 pub status: String,
274 pub message: Option<String>,
275}
276
277#[derive(Debug, Clone, Serialize)]
278pub struct ModelInfo {
279 pub model_id: String,
280 pub capability: String,
281 pub embedding_dim: Option<i64>,
282 pub runtime: String,
283 pub installed: bool,
284 pub disabled: bool,
285 pub status: String,
286 pub installed_at_ms: Option<i64>,
287 pub last_error: Option<String>,
288}
289
290#[derive(Debug, Clone, Serialize)]
291pub struct ReconcileReport {
292 pub processed_chunks: u64,
293 pub embeddings_written: u64,
294 pub skipped_chunks: u64,
295 pub failed_chunks: u64,
296 pub blocked_chunks: u64,
297 pub model_id: String,
298 pub model_version: String,
299 pub embedding_dim: usize,
300 pub batch_size: usize,
301 pub max_embedding_chars: usize,
302 pub forced: bool,
303 pub changed_first: bool,
304 pub until_clean: bool,
305 pub max_seconds: Option<u64>,
306 pub work_reasons: BTreeMap<String, u64>,
307 pub skipped_by_policy: BTreeMap<String, u64>,
308 pub input_chars: u64,
309 pub truncated_inputs: u64,
310 pub elapsed_ms: u64,
311 pub chunks_per_sec: f64,
312 pub chars_per_sec: f64,
313 pub avg_chars_per_chunk: f64,
314 pub status: String,
315 pub message: Option<String>,
316}
317
318#[derive(Debug, Clone, Serialize)]
319pub struct ReconcilePlan {
320 pub embeddings: EmbeddingReconcilePlan,
321 pub summaries: SummaryReconcilePlan,
322}
323
324#[derive(Debug, Clone, Serialize)]
325pub struct EmbeddingReconcilePlan {
326 pub model_id: String,
327 pub model_version: String,
328 pub dim: usize,
329 pub available: bool,
330 pub current: u64,
331 pub missing: u64,
332 pub stale: u64,
333 pub model_changed: u64,
334 pub dim_changed: u64,
335 pub failed_retryable: u64,
336 pub failed_waiting: u64,
337 pub blocked: u64,
338 pub disabled: u64,
339 pub skipped_total: u64,
340 pub skipped_by_policy: BTreeMap<String, u64>,
341 pub missing_by_priority: BTreeMap<String, u64>,
342 pub message: Option<String>,
343}
344
345#[derive(Debug, Clone, Serialize)]
346pub struct SummaryReconcilePlan {
347 pub enabled: bool,
348 pub message: String,
349}
350
351#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
352enum ReconcileReason {
353 Missing,
354 SourceChanged,
355 InputChanged,
356 ModelChanged,
357 DimChanged,
358 RetryAfterFailure,
359 Forced,
360}
361
362impl ReconcileReason {
363 fn as_str(self) -> &'static str {
364 match self {
365 Self::Missing => "Missing",
366 Self::SourceChanged => "SourceChanged",
367 Self::InputChanged => "InputChanged",
368 Self::ModelChanged => "ModelChanged",
369 Self::DimChanged => "DimChanged",
370 Self::RetryAfterFailure => "RetryAfterFailure",
371 Self::Forced => "Forced",
372 }
373 }
374}
375
376#[derive(Debug, Clone)]
377pub struct ReconcileOptions {
378 pub limit: Option<u32>,
379 pub batch_size: Option<u32>,
380 pub force: bool,
381 pub until_clean: bool,
382 pub changed_first: bool,
383 pub max_seconds: Option<u64>,
384 pub max_embedding_chars: usize,
385 pub intra_threads: Option<usize>,
387}
388
389impl Default for ReconcileOptions {
390 fn default() -> Self {
391 Self {
392 limit: None,
393 batch_size: None,
394 force: false,
395 until_clean: false,
396 changed_first: false,
397 max_seconds: None,
398 max_embedding_chars: DEFAULT_MAX_EMBEDDING_CHARS,
399 intra_threads: None,
400 }
401 }
402}
403
404#[derive(Debug, Clone, Serialize)]
405pub struct EmbeddingPolicyDecision {
406 pub policy: String,
407 pub priority: i64,
408 pub eligible: bool,
409}
410
411#[derive(Debug, Clone, Serialize)]
412pub enum ReconcileProgress {
413 Started { model_id: String, total_chunks: u64, batch_size: usize },
414 Batch { processed_chunks: u64, total_chunks: u64, embeddings_written: u64, blocked_chunks: u64 },
415 Finished { processed_chunks: u64, embeddings_written: u64, blocked_chunks: u64 },
416}
417
418#[derive(Debug, Clone)]
419pub(crate) struct CurrentChunk {
420 id: i64,
421 path: String,
422 language: String,
423 file_kind: String,
424 chunk_kind: String,
425 symbol_path: Option<String>,
426 text: String,
427 text_hash: String,
428 embedding_status: Option<String>,
429 source_text_hash: Option<String>,
430 model_version: Option<String>,
431 embedding_dim: Option<i64>,
432 input_hash: Option<String>,
433 embedding_text_version: Option<String>,
434 next_retry_after_ms: Option<i64>,
435 reason: ReconcileReason,
436}
437
438#[derive(Debug)]
439pub(crate) struct PreparedEmbeddingJob {
440 id: i64,
441 text_hash: String,
442 input_text: String,
443 input_hash: String,
444 input_chars: usize,
445 input_truncated: bool,
446 policy: String,
447 priority: i64,
448 reason: ReconcileReason,
449}
450
451pub(crate) struct SelectedBatch {
452 jobs: Vec<PreparedEmbeddingJob>,
453}
454
455impl CurrentChunk {
456 fn reason(
457 &self,
458 model_version: &str,
459 dim: usize,
460 now_ms: i64,
461 _max_embedding_chars: usize,
462 ) -> ReconcileReason {
463 if self.reason == ReconcileReason::Forced {
464 return ReconcileReason::Forced;
465 }
466 if self.embedding_status.is_none() {
467 return ReconcileReason::Missing;
468 }
469 if self.source_text_hash.as_deref() != Some(self.text_hash.as_str()) {
470 return ReconcileReason::SourceChanged;
471 }
472 if self.input_hash.as_deref().is_none_or(str::is_empty) {
473 return ReconcileReason::InputChanged;
474 }
475 if self.model_version.as_deref() != Some(model_version)
476 || self.embedding_text_version.as_deref() != Some(EMBEDDING_TEXT_VERSION)
477 {
478 return ReconcileReason::ModelChanged;
479 }
480 if self.embedding_dim != Some(i64::try_from(dim).unwrap_or(i64::MAX)) {
481 return ReconcileReason::DimChanged;
482 }
483 if self.embedding_status.as_deref() == Some("Failed")
484 && self.next_retry_after_ms.unwrap_or(0) <= now_ms
485 {
486 return ReconcileReason::RetryAfterFailure;
487 }
488 ReconcileReason::Missing
489 }
490}
491
492pub(crate) struct EmbeddingScan<'a> {
496 model_id: &'a str,
497 model_version: &'a str,
498 dim: usize,
499 max_embedding_chars: usize,
500}
501
502pub(crate) struct EmbeddingInput {
503 text: String,
504 chars: usize,
505 truncated: bool,
506}
507
508pub struct QueryEmbedding {
509 pub model_id: String,
510 pub dim: usize,
511 pub vector: Vec<f32>,
512}