1use std::{
2 collections::{BTreeMap, HashSet},
3 path::{Path, PathBuf},
4 time::Instant,
5};
6
7use rusqlite::{Connection, OptionalExtension, params, params_from_iter, types::Value};
8use serde::Serialize;
9use sha2::{Digest, Sha256};
10
11use crate::{index::now_ms, language::Language};
12
13pub const HASH_MODEL_ID: &str = "embedding-hash";
14pub const FASTEMBED_MODEL_ID: &str = "fastembed-all-minilm-l6-v2";
15pub const FASTEMBED_DISPLAY_MODEL: &str = "sentence-transformers/all-MiniLM-L6-v2";
16pub const HASH_EMBEDDING_DIM: usize = 384;
17pub const FASTEMBED_EMBEDDING_DIM: usize = 384;
18pub const MODEL2VEC_MODEL_ID: &str = "model2vec-potion-retrieval-32m";
22pub const MODEL2VEC_DISPLAY_MODEL: &str = "minishlab/potion-retrieval-32M";
23pub const MODEL2VEC_HF_REPO: &str = "minishlab/potion-retrieval-32M";
24pub const MODEL2VEC_EMBEDDING_DIM: usize = 512;
25pub const MODEL2VEC_MISSING_FEATURE_MESSAGE: &str = "Model2Vec backend requested, but this binary was built without Model2Vec support.\nRebuild with default features enabled:\n cargo install rag-rat";
26pub const FASTEMBED_MISSING_FEATURE_MESSAGE: &str = "FastEmbed backend requested, but this binary was built without default FastEmbed support.\nRebuild with default features enabled:\n cargo install rag-rat";
27const ACTIVE_EMBEDDING_MODEL_META: &str = "active_embedding_model";
28const ACTIVE_EMBEDDING_MODEL_VERSION_META: &str = "embedding_active_model_version";
29const LAST_EMBEDDING_RECONCILE_STARTED_META: &str = "last_embedding_reconcile_started_at_ms";
30const LAST_EMBEDDING_RECONCILE_FINISHED_META: &str = "last_embedding_reconcile_finished_at_ms";
31const DEFAULT_BATCH_SIZE: usize = 64;
32pub const DEFAULT_MAX_EMBEDDING_CHARS: usize = 4_000;
33const MIN_EMBEDDING_CHARS: usize = 80;
34pub const EMBEDDING_TEXT_VERSION: &str = "embedding-text-v2";
35const LEGACY_MODEL_IDS: &[&str] = &["embedding-small"];
36#[cfg(feature = "fastembed")]
37const FASTEMBED_HF_CACHE_REPO_DIR: &str = "models--Qdrant--all-MiniLM-L6-v2-onnx";
38
39pub trait Embedder {
40 fn model_id(&self) -> &str;
41 fn dim(&self) -> usize;
42 fn embed_batch(&self, texts: &[String]) -> anyhow::Result<Vec<Vec<f32>>>;
43}
44
45pub struct HashEmbedder;
46
47impl Embedder for HashEmbedder {
48 fn model_id(&self) -> &str {
49 HASH_MODEL_ID
50 }
51
52 fn dim(&self) -> usize {
53 HASH_EMBEDDING_DIM
54 }
55
56 fn embed_batch(&self, texts: &[String]) -> anyhow::Result<Vec<Vec<f32>>> {
57 Ok(texts.iter().map(|text| hash_embed_text(text, HASH_EMBEDDING_DIM)).collect())
58 }
59}
60
61#[cfg(test)]
62pub struct MockEmbedder {
63 model_id: String,
64 dim: usize,
65}
66
67#[cfg(test)]
68impl MockEmbedder {
69 pub fn new(model_id: impl Into<String>, dim: usize) -> Self {
70 Self { model_id: model_id.into(), dim }
71 }
72}
73
74#[cfg(test)]
75impl Embedder for MockEmbedder {
76 fn model_id(&self) -> &str {
77 &self.model_id
78 }
79
80 fn dim(&self) -> usize {
81 self.dim
82 }
83
84 fn embed_batch(&self, texts: &[String]) -> anyhow::Result<Vec<Vec<f32>>> {
85 Ok(texts.iter().map(|text| hash_embed_text(text, self.dim)).collect())
86 }
87}
88
89#[cfg(feature = "fastembed")]
90pub struct FastEmbedEmbedder {
91 model: std::sync::Mutex<fastembed::TextEmbedding>,
92}
93
94#[cfg(feature = "fastembed")]
95impl FastEmbedEmbedder {
96 pub fn new(intra_threads: Option<usize>) -> anyhow::Result<Self> {
97 use fastembed::{EmbeddingModel, InitOptions, TextEmbedding};
98 let mut options = InitOptions::new(EmbeddingModel::AllMiniLML6V2)
99 .with_cache_dir(fastembed_cache_dir())
100 .with_show_download_progress(true);
101 if let Some(threads) = intra_threads.filter(|threads| *threads > 0) {
106 options = options.with_intra_threads(threads);
107 }
108 Ok(Self { model: std::sync::Mutex::new(TextEmbedding::try_new(options)?) })
109 }
110}
111
112#[cfg(feature = "fastembed")]
113impl Embedder for FastEmbedEmbedder {
114 fn model_id(&self) -> &str {
115 FASTEMBED_MODEL_ID
116 }
117
118 fn dim(&self) -> usize {
119 FASTEMBED_EMBEDDING_DIM
120 }
121
122 fn embed_batch(&self, texts: &[String]) -> anyhow::Result<Vec<Vec<f32>>> {
123 let documents = texts.iter().map(String::as_str).collect::<Vec<_>>();
124 let mut model =
125 self.model.lock().map_err(|_| anyhow::anyhow!("fastembed model lock poisoned"))?;
126 model.embed(documents, None)
127 }
128}
129
130#[cfg(feature = "model2vec")]
131pub struct Model2VecEmbedder {
132 model: model2vec_rs::model::StaticModel,
133}
134
135#[cfg(feature = "model2vec")]
136impl Model2VecEmbedder {
137 pub fn new() -> anyhow::Result<Self> {
138 let model = model2vec_rs::model::StaticModel::from_pretrained(
141 MODEL2VEC_HF_REPO,
142 None,
143 Some(true),
144 None,
145 )
146 .map_err(|err| anyhow::anyhow!("failed to load Model2Vec model: {err}"))?;
147 Ok(Self { model })
148 }
149}
150
151#[cfg(feature = "model2vec")]
152impl Embedder for Model2VecEmbedder {
153 fn model_id(&self) -> &str {
154 MODEL2VEC_MODEL_ID
155 }
156
157 fn dim(&self) -> usize {
158 MODEL2VEC_EMBEDDING_DIM
159 }
160
161 fn embed_batch(&self, texts: &[String]) -> anyhow::Result<Vec<Vec<f32>>> {
162 Ok(self.model.encode(texts))
163 }
164}
165
166#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
167pub enum ArtifactStatus {
168 Current,
169 Missing,
170 Stale,
171 Failed,
172 Blocked,
173 Disabled,
174}
175
176impl ArtifactStatus {
177 pub fn as_str(self) -> &'static str {
178 match self {
179 Self::Current => "Current",
180 Self::Missing => "Missing",
181 Self::Stale => "Stale",
182 Self::Failed => "Failed",
183 Self::Blocked => "Blocked",
184 Self::Disabled => "Disabled",
185 }
186 }
187}
188
189#[derive(Debug, Clone, Serialize)]
190pub struct LocalAiStatus {
191 pub embedding: CapabilityStatus,
192 pub artifacts: ArtifactCounts,
193 pub fastembed: FastEmbedOperationalStatus,
194 pub last_reconcile: Option<LastReconcileStatus>,
195}
196
197#[derive(Debug, Clone, Serialize)]
198pub struct CapabilityStatus {
199 pub capability: String,
200 pub model_id: String,
201 pub state: String,
202 pub installed: bool,
203 pub disabled: bool,
204 pub current_artifacts: u64,
205 pub stale_artifacts: u64,
206 pub failed_artifacts: u64,
207 pub blocked_artifacts: u64,
208 pub message: Option<String>,
209}
210
211#[derive(Debug, Clone, Serialize)]
212pub struct FastEmbedOperationalStatus {
213 pub backend: String,
214 pub build_feature_enabled: bool,
215 pub model_id: String,
216 pub model: String,
217 pub dim: usize,
218 pub cache: String,
219 pub installed: bool,
220 pub active: bool,
221 pub status: String,
222 pub current_embeddings: u64,
223 pub eligible_embeddings: u64,
224 pub skipped_embeddings: u64,
225 pub stale_embeddings: u64,
226 pub missing_embeddings: u64,
227 pub failed_embeddings: u64,
228 pub failed_retryable_embeddings: u64,
229 pub failed_waiting_embeddings: u64,
230 pub message: Option<String>,
231 pub next: Option<String>,
232}
233
234#[derive(Debug, Clone, Serialize)]
235pub struct ArtifactCounts {
236 pub total_chunks: u64,
237 pub eligible_chunks: u64,
238 pub skipped_chunks: u64,
239 pub current: u64,
240 pub missing: u64,
241 pub stale: u64,
242 pub failed: u64,
243 pub blocked: u64,
244 pub disabled: u64,
245}
246
247#[derive(Debug, Clone, Serialize)]
248pub struct LastReconcileStatus {
249 pub started_at_ms: i64,
250 pub finished_at_ms: Option<i64>,
251 pub batch_size: u64,
252 pub processed_chunks: u64,
253 pub embeddings_written: u64,
254 pub blocked_chunks: u64,
255 pub elapsed_ms: u64,
256 pub input_chars: u64,
257 pub chunks_per_sec: f64,
258 pub chars_per_sec: f64,
259 pub status: String,
260 pub message: Option<String>,
261}
262
263#[derive(Debug, Clone, Serialize)]
264pub struct ModelInfo {
265 pub model_id: String,
266 pub capability: String,
267 pub embedding_dim: Option<i64>,
268 pub runtime: String,
269 pub installed: bool,
270 pub disabled: bool,
271 pub status: String,
272 pub installed_at_ms: Option<i64>,
273 pub last_error: Option<String>,
274}
275
276#[derive(Debug, Clone, Serialize)]
277pub struct ReconcileReport {
278 pub processed_chunks: u64,
279 pub embeddings_written: u64,
280 pub skipped_chunks: u64,
281 pub failed_chunks: u64,
282 pub blocked_chunks: u64,
283 pub model_id: String,
284 pub model_version: String,
285 pub embedding_dim: usize,
286 pub batch_size: usize,
287 pub max_embedding_chars: usize,
288 pub forced: bool,
289 pub changed_first: bool,
290 pub until_clean: bool,
291 pub max_seconds: Option<u64>,
292 pub work_reasons: BTreeMap<String, u64>,
293 pub skipped_by_policy: BTreeMap<String, u64>,
294 pub input_chars: u64,
295 pub truncated_inputs: u64,
296 pub elapsed_ms: u64,
297 pub chunks_per_sec: f64,
298 pub chars_per_sec: f64,
299 pub avg_chars_per_chunk: f64,
300 pub status: String,
301 pub message: Option<String>,
302}
303
304#[derive(Debug, Clone, Serialize)]
305pub struct ReconcilePlan {
306 pub embeddings: EmbeddingReconcilePlan,
307 pub summaries: SummaryReconcilePlan,
308}
309
310#[derive(Debug, Clone, Serialize)]
311pub struct EmbeddingReconcilePlan {
312 pub model_id: String,
313 pub model_version: String,
314 pub dim: usize,
315 pub available: bool,
316 pub current: u64,
317 pub missing: u64,
318 pub stale: u64,
319 pub model_changed: u64,
320 pub dim_changed: u64,
321 pub failed_retryable: u64,
322 pub failed_waiting: u64,
323 pub blocked: u64,
324 pub disabled: u64,
325 pub skipped_total: u64,
326 pub skipped_by_policy: BTreeMap<String, u64>,
327 pub missing_by_priority: BTreeMap<String, u64>,
328 pub message: Option<String>,
329}
330
331#[derive(Debug, Clone, Serialize)]
332pub struct SummaryReconcilePlan {
333 pub enabled: bool,
334 pub message: String,
335}
336
337#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
338enum ReconcileReason {
339 Missing,
340 SourceChanged,
341 InputChanged,
342 ModelChanged,
343 DimChanged,
344 RetryAfterFailure,
345 Forced,
346}
347
348impl ReconcileReason {
349 fn as_str(self) -> &'static str {
350 match self {
351 Self::Missing => "Missing",
352 Self::SourceChanged => "SourceChanged",
353 Self::InputChanged => "InputChanged",
354 Self::ModelChanged => "ModelChanged",
355 Self::DimChanged => "DimChanged",
356 Self::RetryAfterFailure => "RetryAfterFailure",
357 Self::Forced => "Forced",
358 }
359 }
360}
361
362#[derive(Debug, Clone)]
363pub struct ReconcileOptions {
364 pub limit: Option<u32>,
365 pub batch_size: Option<u32>,
366 pub force: bool,
367 pub until_clean: bool,
368 pub changed_first: bool,
369 pub max_seconds: Option<u64>,
370 pub max_embedding_chars: usize,
371 pub intra_threads: Option<usize>,
373}
374
375impl Default for ReconcileOptions {
376 fn default() -> Self {
377 Self {
378 limit: None,
379 batch_size: None,
380 force: false,
381 until_clean: false,
382 changed_first: false,
383 max_seconds: None,
384 max_embedding_chars: DEFAULT_MAX_EMBEDDING_CHARS,
385 intra_threads: None,
386 }
387 }
388}
389
390#[derive(Debug, Clone, Serialize)]
391pub struct EmbeddingPolicyDecision {
392 pub policy: String,
393 pub priority: i64,
394 pub eligible: bool,
395}
396
397#[derive(Debug, Clone, Serialize)]
398pub enum ReconcileProgress {
399 Started { model_id: String, total_chunks: u64, batch_size: usize },
400 Batch { processed_chunks: u64, total_chunks: u64, embeddings_written: u64, blocked_chunks: u64 },
401 Finished { processed_chunks: u64, embeddings_written: u64, blocked_chunks: u64 },
402}
403
404pub fn ensure_model_manifest(conn: &Connection) -> anyhow::Result<()> {
405 remove_legacy_models(conn)?;
406 upsert_model(conn, HASH_MODEL_ID, "embedding", Some(HASH_EMBEDDING_DIM), "hash", false)?;
407 upsert_model(
408 conn,
409 FASTEMBED_MODEL_ID,
410 "embedding",
411 Some(FASTEMBED_EMBEDDING_DIM),
412 "fastembed",
413 false,
414 )?;
415 upsert_model(
416 conn,
417 MODEL2VEC_MODEL_ID,
418 "embedding",
419 Some(MODEL2VEC_EMBEDDING_DIM),
420 "model2vec",
421 false,
422 )?;
423 normalize_embedding_model_versions(conn)?;
424 Ok(())
425}
426
427fn remove_legacy_models(conn: &Connection) -> anyhow::Result<()> {
428 for model_id in LEGACY_MODEL_IDS {
429 conn.execute("DELETE FROM chunk_embeddings WHERE model_id = ?1", params![model_id])?;
430 conn.execute("DELETE FROM ai_models WHERE model_id = ?1", params![model_id])?;
431 conn.execute(
432 "DELETE FROM index_meta WHERE key = ?1 AND value = ?2",
433 params![ACTIVE_EMBEDDING_MODEL_META, model_id],
434 )?;
435 }
436 Ok(())
437}
438
439fn normalize_embedding_model_versions(conn: &Connection) -> anyhow::Result<()> {
440 conn.execute(
441 "
442 UPDATE chunk_embeddings
443 SET model_version = CASE model_id
444 WHEN 'embedding-hash' THEN 'hash-v1'
445 WHEN 'fastembed-all-minilm-l6-v2' THEN 'fastembed-all-minilm-l6-v2-v1'
446 ELSE model_version
447 END
448 WHERE model_version = 'v1'
449 AND model_id IN ('embedding-hash', 'fastembed-all-minilm-l6-v2')
450 ",
451 [],
452 )?;
453 Ok(())
454}
455
456pub(super) fn recover_cached_fastembed_model(conn: &Connection) -> anyhow::Result<()> {
457 recover_cached_fastembed_model_from(conn, &fastembed_cache_dir())
458}
459
460pub(super) fn recover_cached_fastembed_model_from(
461 conn: &Connection,
462 cache_dir: &Path,
463) -> anyhow::Result<()> {
464 #[cfg(feature = "fastembed")]
465 {
466 recover_cached_fastembed_model_at(conn, cache_dir)?;
467 }
468 #[cfg(not(feature = "fastembed"))]
469 {
470 let _ = (conn, cache_dir);
471 }
472 Ok(())
473}
474
475#[cfg(feature = "fastembed")]
476pub(super) fn recover_cached_fastembed_model_at(
477 conn: &Connection,
478 cache_dir: &Path,
479) -> anyhow::Result<()> {
480 if !fastembed_cache_ready(cache_dir) {
481 return Ok(());
482 }
483 let fastembed = model(conn, FASTEMBED_MODEL_ID)?;
484 if !fastembed.installed || fastembed.status != "Ready" {
485 conn.execute(
486 "UPDATE ai_models
487 SET installed = 1, disabled = 0, status = 'Ready', installed_at_ms = ?2,
488 embedding_dim = ?3, runtime = 'fastembed', last_error = NULL
489 WHERE model_id = ?1",
490 params![
491 FASTEMBED_MODEL_ID,
492 now_ms(),
493 i64::try_from(FASTEMBED_EMBEDDING_DIM).unwrap_or(i64::MAX)
494 ],
495 )?;
496 }
497 if active_embedding_model_is_missing(conn)? {
498 set_meta(conn, ACTIVE_EMBEDDING_MODEL_META, FASTEMBED_MODEL_ID)?;
499 }
500 Ok(())
501}
502
503#[cfg(feature = "fastembed")]
504fn active_embedding_model_is_missing(conn: &Connection) -> anyhow::Result<bool> {
505 let Some(active_model_id) = meta(conn, ACTIVE_EMBEDDING_MODEL_META)? else {
506 return Ok(true);
507 };
508 let active = conn
509 .query_row(
510 "
511 SELECT model_id, capability, embedding_dim, runtime, installed, disabled, status, installed_at_ms, last_error
512 FROM ai_models WHERE model_id = ?1
513 ",
514 [active_model_id],
515 model_row,
516 )
517 .optional()?;
518 Ok(match active {
519 Some(active) => validate_ready_model(&active).is_err(),
520 None => true,
521 })
522}
523
524#[cfg(feature = "fastembed")]
525fn fastembed_cache_ready(cache_dir: &Path) -> bool {
526 let repo = cache_dir.join(FASTEMBED_HF_CACHE_REPO_DIR);
527 let Ok(revision) = std::fs::read_to_string(repo.join("refs").join("main")) else {
528 return false;
529 };
530 let revision = revision.trim();
531 !revision.is_empty() && repo.join("snapshots").join(revision).is_dir()
532}
533
534pub fn install_model(conn: &Connection, model_id: &str) -> anyhow::Result<ModelInfo> {
535 ensure_model_manifest(conn)?;
536 match model_id {
537 HASH_MODEL_ID => {
538 conn.execute(
539 "UPDATE ai_models
540 SET installed = 1, disabled = 0, status = 'Ready', installed_at_ms = ?2,
541 embedding_dim = ?3, runtime = 'hash', last_error = NULL
542 WHERE model_id = ?1",
543 params![model_id, now_ms(), i64::try_from(HASH_EMBEDDING_DIM).unwrap_or(i64::MAX)],
544 )?;
545 set_meta(conn, ACTIVE_EMBEDDING_MODEL_META, model_id)?;
546 },
547 FASTEMBED_MODEL_ID => {
548 install_fastembed_model(conn, model_id)?;
549 set_meta(conn, ACTIVE_EMBEDDING_MODEL_META, model_id)?;
550 },
551 MODEL2VEC_MODEL_ID => {
552 install_model2vec_model(conn, model_id)?;
553 set_meta(conn, ACTIVE_EMBEDDING_MODEL_META, model_id)?;
554 },
555 other => anyhow::bail!("unknown local AI model `{other}`"),
556 }
557 model(conn, model_id)
558}
559
560pub fn models(conn: &Connection) -> anyhow::Result<Vec<ModelInfo>> {
561 ensure_model_manifest(conn)?;
562 let mut stmt = conn.prepare(
563 "
564 SELECT model_id, capability, embedding_dim, runtime, installed, disabled, status, installed_at_ms, last_error
565 FROM ai_models
566 ORDER BY capability, model_id
567 ",
568 )?;
569 let rows = stmt.query_map([], model_row)?;
570 collect_rows(rows)
571}
572
573pub fn status(conn: &Connection) -> anyhow::Result<LocalAiStatus> {
574 ensure_model_manifest(conn)?;
575 let total_chunks = chunk_count(conn)?;
576 let active_model_id = active_embedding_model_id(conn)?;
577 let embedding = capability_status(conn, "embedding", &active_model_id, total_chunks)?;
578 let fastembed = fastembed_operational_status(conn, &active_model_id)?;
579 let current = embedding.current_artifacts;
580 let stale = embedding.stale_artifacts;
581 let failed = embedding.failed_artifacts;
582 let blocked = embedding.blocked_artifacts;
583 let missing = total_chunks.saturating_sub(current + stale + failed + blocked);
584 let skipped_chunks = fastembed.skipped_embeddings;
585 let eligible_chunks = total_chunks.saturating_sub(skipped_chunks);
586 Ok(LocalAiStatus {
587 embedding,
588 artifacts: ArtifactCounts {
589 total_chunks,
590 eligible_chunks,
591 skipped_chunks,
592 current,
593 missing,
594 stale,
595 failed,
596 blocked,
597 disabled: 0,
598 },
599 fastembed,
600 last_reconcile: last_reconcile_status(conn)?,
601 })
602}
603
604pub fn reconcile_plan(conn: &Connection) -> anyhow::Result<ReconcilePlan> {
605 ensure_model_manifest(conn)?;
606 let model_id = active_embedding_model_id(conn)?;
607 let model = model(conn, &model_id)?;
608 let model_version = active_embedding_model_version(conn, &model_id)?;
609 let dim = usize::try_from(model.embedding_dim.unwrap_or_default()).unwrap_or(0);
610 let available = validate_ready_model(&model).is_ok();
611 let message = (!available).then(|| model_not_ready_reason(&model));
612 Ok(ReconcilePlan {
613 embeddings: embedding_reconcile_plan(
614 conn,
615 &model,
616 &model_version,
617 dim,
618 available,
619 message,
620 )?,
621 summaries: SummaryReconcilePlan {
622 enabled: false,
623 message: "summaries are not implemented yet".to_string(),
624 },
625 })
626}
627
628fn embedding_reconcile_plan(
629 conn: &Connection,
630 model: &ModelInfo,
631 model_version: &str,
632 dim: usize,
633 available: bool,
634 message: Option<String>,
635) -> anyhow::Result<EmbeddingReconcilePlan> {
636 let jobs = embedding_job_candidates(conn, &model.model_id, model_version, dim, None, false)?;
637 let skipped_by_policy = embedding_policy_skip_summary(conn, DEFAULT_MAX_EMBEDDING_CHARS)?;
638 let mut missing_by_priority = BTreeMap::new();
639 let mut current = 0_u64;
640 let mut missing = 0_u64;
641 let mut stale = 0_u64;
642 let mut model_changed = 0_u64;
643 let mut dim_changed = 0_u64;
644 let mut failed_retryable = 0_u64;
645 let mut failed_waiting = 0_u64;
646 let mut blocked = 0_u64;
647 for job in jobs {
648 let policy = policy_for_job(&job, DEFAULT_MAX_EMBEDDING_CHARS);
649 if !policy.eligible {
650 continue;
651 }
652 let current_artifact = job.embedding_status.as_deref() == Some("Current")
653 && job.source_text_hash.as_deref() == Some(job.text_hash.as_str())
654 && job.model_version.as_deref() == Some(model_version)
655 && job.embedding_dim == Some(i64::try_from(dim).unwrap_or(i64::MAX))
656 && job.embedding_text_version.as_deref() == Some(EMBEDDING_TEXT_VERSION)
657 && job.input_hash.as_deref().is_some_and(|input_hash| {
658 let input = build_embedding_input(&job, DEFAULT_MAX_EMBEDDING_CHARS);
659 input_hash == embedding_input_hash(&model.model_id, model_version, &input.text)
660 });
661 if current_artifact {
662 current += 1;
663 continue;
664 }
665 let reason = job.reason(model_version, dim, now_ms(), DEFAULT_MAX_EMBEDDING_CHARS);
666 match reason {
667 ReconcileReason::Missing => missing += 1,
668 ReconcileReason::SourceChanged => stale += 1,
669 ReconcileReason::InputChanged => stale += 1,
670 ReconcileReason::ModelChanged => model_changed += 1,
671 ReconcileReason::DimChanged => dim_changed += 1,
672 ReconcileReason::RetryAfterFailure => failed_retryable += 1,
673 ReconcileReason::Forced => missing += 1,
674 }
675 *missing_by_priority.entry(priority_label(policy.priority).to_string()).or_default() += 1;
676 if job.embedding_status.as_deref() == Some("Failed")
677 && job.next_retry_after_ms.unwrap_or(0) > now_ms()
678 {
679 failed_waiting += 1;
680 }
681 if job.embedding_status.as_deref() == Some("Blocked") {
682 blocked += 1;
683 }
684 }
685 Ok(EmbeddingReconcilePlan {
686 model_id: model.model_id.clone(),
687 model_version: model_version.to_string(),
688 dim,
689 available,
690 current,
691 missing,
692 stale,
693 model_changed,
694 dim_changed,
695 failed_retryable,
696 failed_waiting,
697 blocked,
698 disabled: u64::from(model.disabled),
699 skipped_total: skipped_by_policy.values().sum(),
700 skipped_by_policy,
701 missing_by_priority,
702 message,
703 })
704}
705
706pub fn reconcile(
707 conn: &Connection,
708 limit: Option<u32>,
709 batch_size: Option<u32>,
710) -> anyhow::Result<ReconcileReport> {
711 reconcile_with_options_progress(
712 conn,
713 ReconcileOptions { limit, batch_size, ..ReconcileOptions::default() },
714 |_| {},
715 )
716}
717
718pub fn reconcile_with_progress(
719 conn: &Connection,
720 limit: Option<u32>,
721 batch_size: Option<u32>,
722 force: bool,
723 progress: impl FnMut(ReconcileProgress),
724) -> anyhow::Result<ReconcileReport> {
725 reconcile_with_options_progress(
726 conn,
727 ReconcileOptions { limit, batch_size, force, ..ReconcileOptions::default() },
728 progress,
729 )
730}
731
732pub fn reconcile_with_options_progress(
733 conn: &Connection,
734 options: ReconcileOptions,
735 mut progress: impl FnMut(ReconcileProgress),
736) -> anyhow::Result<ReconcileReport> {
737 ensure_model_manifest(conn)?;
738 let active_model_id = active_embedding_model_id(conn)?;
739 let model = model(conn, &active_model_id)?;
740 let model_version = active_embedding_model_version(conn, &active_model_id)?;
741 let embedding_dim = usize::try_from(model.embedding_dim.unwrap_or_default()).unwrap_or(0);
742 let batch_size = options
743 .batch_size
744 .map(usize::try_from)
745 .transpose()?
746 .filter(|value| *value > 0)
747 .unwrap_or(DEFAULT_BATCH_SIZE);
748 let max_embedding_chars = options.max_embedding_chars.max(MIN_EMBEDDING_CHARS);
749 let started = now_ms();
750 set_reconcile_meta(conn, LAST_EMBEDDING_RECONCILE_STARTED_META, &started.to_string())?;
751 conn.execute(
752 "INSERT INTO reconcile_attempts(started_at_ms, limit_count, status, batch_size) VALUES (?1, ?2, 'Running', ?3)",
753 params![
754 started,
755 options.limit.map(i64::from),
756 i64::try_from(batch_size).unwrap_or(i64::MAX)
757 ],
758 )?;
759 let attempt_id = conn.last_insert_rowid();
760 let timer = Instant::now();
761
762 let embedder = active_embedder(conn, options.intra_threads);
763 let skipped_by_policy = embedding_policy_skip_summary(conn, max_embedding_chars)?;
764 let skipped_chunks = skipped_by_policy.values().sum();
765 let mut report = ReconcileReport {
766 processed_chunks: 0,
767 embeddings_written: 0,
768 skipped_chunks,
769 failed_chunks: 0,
770 blocked_chunks: 0,
771 model_id: active_model_id.clone(),
772 model_version: model_version.clone(),
773 embedding_dim,
774 batch_size,
775 max_embedding_chars,
776 forced: options.force,
777 changed_first: options.changed_first,
778 until_clean: options.until_clean,
779 max_seconds: options.max_seconds,
780 work_reasons: BTreeMap::new(),
781 skipped_by_policy,
782 input_chars: 0,
783 truncated_inputs: 0,
784 elapsed_ms: 0,
785 chunks_per_sec: 0.0,
786 chars_per_sec: 0.0,
787 avg_chars_per_chunk: 0.0,
788 status: "Current".to_string(),
789 message: None,
790 };
791
792 let embedder = match embedder {
793 Ok(embedder) => embedder,
794 Err(_) => {
795 report.status = "Blocked".to_string();
796 report.message = Some(format!(
797 "{} model is not ready; run `rag-rat models install {}`",
798 active_model_id, active_model_id
799 ));
800 finish_reconcile_attempt(conn, attempt_id, &report)?;
801 progress(ReconcileProgress::Started {
802 model_id: active_model_id,
803 total_chunks: 0,
804 batch_size,
805 });
806 progress(ReconcileProgress::Finished {
807 processed_chunks: 0,
808 embeddings_written: 0,
809 blocked_chunks: 0,
810 });
811 return Ok(report);
812 },
813 };
814
815 let scan = EmbeddingScan {
816 model_id: &active_model_id,
817 model_version: &model_version,
818 dim: embedding_dim,
819 max_embedding_chars,
820 };
821 let mut progress_total_chunks = estimated_reconcile_jobs(conn, &scan, &options)?;
822 progress(ReconcileProgress::Started {
823 model_id: active_model_id.clone(),
824 total_chunks: progress_total_chunks,
825 batch_size,
826 });
827
828 let candidate_ids = embedding_candidate_ids(
833 conn,
834 if options.force { "" } else { scan.model_id },
835 options.changed_first,
836 )?;
837 let mut cursor = 0usize;
838 let mut processed_ids: HashSet<i64> = HashSet::new();
839 let mut remaining = options.limit.map(u64::from);
840 loop {
841 if remaining == Some(0) {
842 break;
843 }
844 if options.max_seconds.is_some_and(|seconds| timer.elapsed().as_secs() >= seconds) {
845 report.status = "Partial".to_string();
846 report.message = Some(format!(
847 "max_seconds={} reached; rerun reconcile to continue",
848 options.max_seconds.unwrap_or_default()
849 ));
850 break;
851 }
852 let batch_limit = remaining
853 .map(|value| value.min(u64::try_from(batch_size).unwrap_or(u64::MAX)))
854 .and_then(|value| usize::try_from(value).ok())
855 .unwrap_or(batch_size);
856 let mut batch_ids = Vec::with_capacity(batch_limit);
858 while cursor < candidate_ids.len() && batch_ids.len() < batch_limit {
859 let id = candidate_ids[cursor];
860 cursor += 1;
861 if !processed_ids.contains(&id) {
862 batch_ids.push(id);
863 }
864 }
865 if batch_ids.is_empty() {
866 break; }
868 let selected = select_reconcile_batch(conn, &scan, &batch_ids, &options)?;
869 if selected.jobs.is_empty() {
870 continue;
873 }
874 for job in &selected.jobs {
875 processed_ids.insert(job.id);
876 *report.work_reasons.entry(job.reason.as_str().to_string()).or_default() += 1;
877 report.input_chars = report
878 .input_chars
879 .saturating_add(u64::try_from(job.input_chars).unwrap_or(u64::MAX));
880 if job.input_truncated {
881 report.truncated_inputs += 1;
882 }
883 }
884 let jobs_len = selected.jobs.len();
885 let mut reused_jobs = Vec::new();
886 let mut to_embed_jobs = Vec::new();
887 for job in selected.jobs {
888 match find_existing_embedding(conn, &active_model_id, &job.input_hash, embedding_dim)? {
889 Some(vector) => reused_jobs.push((job, vector)),
890 None => to_embed_jobs.push(job),
891 }
892 }
893
894 if !reused_jobs.is_empty() {
895 let (reused_jobs_slice, reused_vectors_slice): (Vec<_>, Vec<_>) =
896 reused_jobs.into_iter().unzip();
897 write_current_embedding_batch(
898 conn,
899 embedder.as_ref(),
900 &model_version,
901 &reused_jobs_slice,
902 &reused_vectors_slice,
903 )?;
904 report.embeddings_written += u64::try_from(reused_jobs_slice.len()).unwrap_or(u64::MAX);
905 }
906
907 if !to_embed_jobs.is_empty() {
908 let texts =
909 to_embed_jobs.iter().map(|chunk| chunk.input_text.clone()).collect::<Vec<_>>();
910 match embedder.embed_batch(&texts) {
911 Ok(vectors) if vectors.len() == to_embed_jobs.len() => {
912 write_current_embedding_batch(
913 conn,
914 embedder.as_ref(),
915 &model_version,
916 &to_embed_jobs,
917 &vectors,
918 )?;
919 report.embeddings_written +=
920 u64::try_from(to_embed_jobs.len()).unwrap_or(u64::MAX);
921 },
922 Ok(vectors) => {
923 let error = format!(
924 "embedder {} returned {} vectors for {} texts",
925 embedder.model_id(),
926 vectors.len(),
927 to_embed_jobs.len()
928 );
929 write_failed_embedding_batch(
930 conn,
931 embedder.as_ref(),
932 &model_version,
933 &to_embed_jobs,
934 &error,
935 )?;
936 report.failed_chunks += u64::try_from(to_embed_jobs.len()).unwrap_or(u64::MAX);
937 },
938 Err(err) => {
939 write_failed_embedding_batch(
940 conn,
941 embedder.as_ref(),
942 &model_version,
943 &to_embed_jobs,
944 &err.to_string(),
945 )?;
946 report.failed_chunks += u64::try_from(to_embed_jobs.len()).unwrap_or(u64::MAX);
947 },
948 }
949 }
950 report.processed_chunks = report
951 .embeddings_written
952 .saturating_add(report.failed_chunks)
953 .saturating_add(report.blocked_chunks);
954 if let Some(value) = remaining.as_mut() {
955 *value = value.saturating_sub(u64::try_from(jobs_len).unwrap_or(0));
956 }
957 progress_total_chunks = progress_total_chunks.max(report.processed_chunks);
958 progress(ReconcileProgress::Batch {
959 processed_chunks: report.embeddings_written
960 + report.failed_chunks
961 + report.blocked_chunks,
962 total_chunks: progress_total_chunks,
963 embeddings_written: report.embeddings_written,
964 blocked_chunks: report.blocked_chunks,
965 });
966 }
967 if report.failed_chunks > 0 {
968 report.status = "Failed".to_string();
969 report.message =
970 Some(format!("{} chunks failed; retry after backoff", report.failed_chunks));
971 }
972 finalize_reconcile_throughput(&mut report, timer.elapsed().as_millis());
973
974 finish_reconcile_attempt(conn, attempt_id, &report)?;
975 progress(ReconcileProgress::Finished {
976 processed_chunks: report.processed_chunks,
977 embeddings_written: report.embeddings_written,
978 blocked_chunks: report.blocked_chunks,
979 });
980 Ok(report)
981}
982
983fn embedding_policy_skip_summary(
984 conn: &Connection,
985 max_embedding_chars: usize,
986) -> anyhow::Result<BTreeMap<String, u64>> {
987 let mut skipped_by_policy = BTreeMap::new();
988 for chunk in current_chunks(conn, None)? {
989 let policy = policy_for_job(&chunk, max_embedding_chars);
990 if !policy.eligible {
991 *skipped_by_policy.entry(policy.policy).or_default() += 1;
992 }
993 }
994 Ok(skipped_by_policy)
995}
996
997fn finish_reconcile_attempt(
998 conn: &Connection,
999 attempt_id: i64,
1000 report: &ReconcileReport,
1001) -> anyhow::Result<()> {
1002 let finished = now_ms();
1003 conn.execute(
1004 "
1005 UPDATE reconcile_attempts
1006 SET finished_at_ms = ?2,
1007 processed_chunks = ?3,
1008 embeddings_written = ?4,
1009 blocked_chunks = ?5,
1010 status = ?6,
1011 message = ?7,
1012 elapsed_ms = ?8,
1013 input_chars = ?9,
1014 batch_size = ?10
1015 WHERE id = ?1
1016 ",
1017 params![
1018 attempt_id,
1019 finished,
1020 i64::try_from(report.processed_chunks).unwrap_or(i64::MAX),
1021 i64::try_from(report.embeddings_written).unwrap_or(i64::MAX),
1022 i64::try_from(report.blocked_chunks).unwrap_or(i64::MAX),
1023 report.status,
1024 report.message,
1025 i64::try_from(report.elapsed_ms).unwrap_or(i64::MAX),
1026 i64::try_from(report.input_chars).unwrap_or(i64::MAX),
1027 i64::try_from(report.batch_size).unwrap_or(i64::MAX),
1028 ],
1029 )?;
1030 set_reconcile_meta(conn, LAST_EMBEDDING_RECONCILE_FINISHED_META, &finished.to_string())?;
1031 Ok(())
1032}
1033
1034fn last_reconcile_status(conn: &Connection) -> anyhow::Result<Option<LastReconcileStatus>> {
1035 conn.query_row(
1036 "
1037 SELECT started_at_ms,
1038 finished_at_ms,
1039 batch_size,
1040 processed_chunks,
1041 embeddings_written,
1042 blocked_chunks,
1043 elapsed_ms,
1044 input_chars,
1045 status,
1046 message
1047 FROM reconcile_attempts
1048 ORDER BY started_at_ms DESC, id DESC
1049 LIMIT 1
1050 ",
1051 [],
1052 |row| {
1053 let elapsed_ms = u64::try_from(row.get::<_, i64>(6)?).unwrap_or(0);
1054 let input_chars = u64::try_from(row.get::<_, i64>(7)?).unwrap_or(0);
1055 let embeddings_written = u64::try_from(row.get::<_, i64>(4)?).unwrap_or(0);
1056 let elapsed_secs = (elapsed_ms as f64 / 1000.0).max(0.001);
1057 Ok(LastReconcileStatus {
1058 started_at_ms: row.get(0)?,
1059 finished_at_ms: row.get(1)?,
1060 batch_size: u64::try_from(row.get::<_, i64>(2)?).unwrap_or(0),
1061 processed_chunks: u64::try_from(row.get::<_, i64>(3)?).unwrap_or(0),
1062 embeddings_written,
1063 blocked_chunks: u64::try_from(row.get::<_, i64>(5)?).unwrap_or(0),
1064 elapsed_ms,
1065 input_chars,
1066 chunks_per_sec: embeddings_written as f64 / elapsed_secs,
1067 chars_per_sec: input_chars as f64 / elapsed_secs,
1068 status: row.get(8)?,
1069 message: row.get(9)?,
1070 })
1071 },
1072 )
1073 .optional()
1074 .map_err(Into::into)
1075}
1076
1077fn finalize_reconcile_throughput(report: &mut ReconcileReport, elapsed_ms: u128) {
1078 report.elapsed_ms = u64::try_from(elapsed_ms).unwrap_or(u64::MAX);
1079 let elapsed_secs = (report.elapsed_ms as f64 / 1000.0).max(0.001);
1080 report.chunks_per_sec = report.embeddings_written as f64 / elapsed_secs;
1081 report.chars_per_sec = report.input_chars as f64 / elapsed_secs;
1082 report.avg_chars_per_chunk = if report.embeddings_written > 0 {
1083 report.input_chars as f64 / report.embeddings_written as f64
1084 } else {
1085 0.0
1086 };
1087}
1088
1089fn upsert_model(
1090 conn: &Connection,
1091 model_id: &str,
1092 capability: &str,
1093 embedding_dim: Option<usize>,
1094 runtime: &str,
1095 installed_by_default: bool,
1096) -> anyhow::Result<()> {
1097 conn.execute(
1098 "
1099 INSERT INTO ai_models(model_id, capability, embedding_dim, runtime, installed, disabled, status, installed_at_ms)
1100 VALUES (?1, ?2, ?3, ?4, ?5, 0, ?6, ?7)
1101 ON CONFLICT(model_id) DO NOTHING
1102 ",
1103 params![
1104 model_id,
1105 capability,
1106 embedding_dim.map(|dim| i64::try_from(dim).unwrap_or(i64::MAX)),
1107 runtime,
1108 installed_by_default,
1109 if installed_by_default { "Ready" } else { "MissingModel" },
1110 installed_by_default.then(now_ms),
1111 ],
1112 )?;
1113 Ok(())
1114}
1115
1116fn install_model2vec_model(conn: &Connection, model_id: &str) -> anyhow::Result<()> {
1117 #[cfg(feature = "model2vec")]
1118 {
1119 let embedder = Model2VecEmbedder::new()?;
1120 conn.execute(
1121 "UPDATE ai_models
1122 SET installed = 1, disabled = 0, status = 'Ready', installed_at_ms = ?2,
1123 embedding_dim = ?3, runtime = 'model2vec', last_error = NULL
1124 WHERE model_id = ?1",
1125 params![model_id, now_ms(), i64::try_from(embedder.dim()).unwrap_or(i64::MAX)],
1126 )?;
1127 Ok(())
1128 }
1129 #[cfg(not(feature = "model2vec"))]
1130 {
1131 conn.execute(
1132 "UPDATE ai_models
1133 SET installed = 0, disabled = 0, status = 'MissingRuntime', last_error = ?2
1134 WHERE model_id = ?1",
1135 params![model_id, MODEL2VEC_MISSING_FEATURE_MESSAGE],
1136 )?;
1137 anyhow::bail!("{}", MODEL2VEC_MISSING_FEATURE_MESSAGE)
1138 }
1139}
1140
1141fn install_fastembed_model(conn: &Connection, model_id: &str) -> anyhow::Result<()> {
1142 #[cfg(feature = "fastembed")]
1143 {
1144 let embedder = FastEmbedEmbedder::new(None)
1145 .map_err(|err| anyhow::anyhow!("failed to initialize fastembed model: {err}"))?;
1146 conn.execute(
1147 "UPDATE ai_models
1148 SET installed = 1, disabled = 0, status = 'Ready', installed_at_ms = ?2,
1149 embedding_dim = ?3, runtime = 'fastembed', last_error = NULL
1150 WHERE model_id = ?1",
1151 params![model_id, now_ms(), i64::try_from(embedder.dim()).unwrap_or(i64::MAX)],
1152 )?;
1153 Ok(())
1154 }
1155 #[cfg(not(feature = "fastembed"))]
1156 {
1157 conn.execute(
1158 "UPDATE ai_models
1159 SET installed = 0, disabled = 0, status = 'MissingRuntime', last_error = ?2
1160 WHERE model_id = ?1",
1161 params![model_id, FASTEMBED_MISSING_FEATURE_MESSAGE],
1162 )?;
1163 anyhow::bail!("{}", FASTEMBED_MISSING_FEATURE_MESSAGE)
1164 }
1165}
1166
1167fn fastembed_operational_status(
1168 conn: &Connection,
1169 active_model_id: &str,
1170) -> anyhow::Result<FastEmbedOperationalStatus> {
1171 let model = model(conn, FASTEMBED_MODEL_ID)?;
1172 let model_version = active_embedding_model_version(conn, FASTEMBED_MODEL_ID)?;
1173 let plan = embedding_reconcile_plan(
1174 conn,
1175 &model,
1176 &model_version,
1177 FASTEMBED_EMBEDDING_DIM,
1178 validate_ready_model(&model).is_ok(),
1179 model.last_error.clone(),
1180 )?;
1181 let failed = plan.failed_retryable.saturating_add(plan.failed_waiting);
1182 Ok(FastEmbedOperationalStatus {
1183 backend: "fastembed".to_string(),
1184 build_feature_enabled: fastembed_build_feature_enabled(),
1185 model_id: FASTEMBED_MODEL_ID.to_string(),
1186 model: FASTEMBED_DISPLAY_MODEL.to_string(),
1187 dim: FASTEMBED_EMBEDDING_DIM,
1188 cache: fastembed_cache_dir().display().to_string(),
1189 installed: model.installed,
1190 active: active_model_id == FASTEMBED_MODEL_ID,
1191 status: model.status,
1192 current_embeddings: plan.current,
1193 eligible_embeddings: plan
1194 .current
1195 .saturating_add(plan.missing)
1196 .saturating_add(plan.stale)
1197 .saturating_add(plan.model_changed)
1198 .saturating_add(plan.dim_changed)
1199 .saturating_add(plan.failed_retryable)
1200 .saturating_add(plan.failed_waiting)
1201 .saturating_add(plan.blocked),
1202 skipped_embeddings: plan.skipped_total,
1203 stale_embeddings: plan
1204 .stale
1205 .saturating_add(plan.model_changed)
1206 .saturating_add(plan.dim_changed),
1207 missing_embeddings: plan.missing,
1208 failed_embeddings: failed,
1209 failed_retryable_embeddings: plan.failed_retryable,
1210 failed_waiting_embeddings: plan.failed_waiting,
1211 message: model.last_error,
1212 next: fastembed_next_command(&plan),
1213 })
1214}
1215
1216fn fastembed_next_command(plan: &EmbeddingReconcilePlan) -> Option<String> {
1217 if !fastembed_build_feature_enabled() {
1218 return Some("cargo install rag-rat".to_string());
1219 }
1220 if !plan.available {
1221 return Some(format!("rag-rat models install {}", FASTEMBED_MODEL_ID));
1222 }
1223 if plan.missing > 0
1224 || plan.stale > 0
1225 || plan.model_changed > 0
1226 || plan.dim_changed > 0
1227 || plan.failed_retryable > 0
1228 {
1229 return Some("rag-rat reconcile --limit 500".to_string());
1230 }
1231 if plan.failed_waiting > 0 {
1232 return Some("rag-rat reconcile --plan".to_string());
1233 }
1234 None
1235}
1236
1237fn fastembed_build_feature_enabled() -> bool {
1238 cfg!(feature = "fastembed")
1239}
1240
1241fn capability_status(
1242 conn: &Connection,
1243 capability: &str,
1244 model_id: &str,
1245 total_chunks: u64,
1246) -> anyhow::Result<CapabilityStatus> {
1247 let model = model(conn, model_id)?;
1248 let current = current_artifact_count(conn, capability, model_id)?;
1249 let stale = stale_artifact_count(conn, capability, model_id)?;
1250 let failed = status_artifact_count(conn, capability, model_id, ArtifactStatus::Failed)?;
1251 let blocked = status_artifact_count(conn, capability, model_id, ArtifactStatus::Blocked)?;
1252 let state = if model.disabled {
1253 "Disabled"
1254 } else if total_chunks == 0 {
1255 "IndexEmpty"
1256 } else if !model.installed {
1257 "MissingModel"
1258 } else if failed > 0 {
1259 "Failed"
1260 } else {
1261 "Ready"
1262 };
1263 Ok(CapabilityStatus {
1264 capability: capability.to_string(),
1265 model_id: model_id.to_string(),
1266 state: state.to_string(),
1267 installed: model.installed,
1268 disabled: model.disabled,
1269 current_artifacts: current,
1270 stale_artifacts: stale,
1271 failed_artifacts: failed,
1272 blocked_artifacts: blocked,
1273 message: model.last_error,
1274 })
1275}
1276
1277fn model(conn: &Connection, model_id: &str) -> anyhow::Result<ModelInfo> {
1278 Ok(conn.query_row(
1279 "
1280 SELECT model_id, capability, embedding_dim, runtime, installed, disabled, status, installed_at_ms, last_error
1281 FROM ai_models WHERE model_id = ?1
1282 ",
1283 [model_id],
1284 model_row,
1285 )?)
1286}
1287
1288fn model_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<ModelInfo> {
1289 Ok(ModelInfo {
1290 model_id: row.get(0)?,
1291 capability: row.get(1)?,
1292 embedding_dim: row.get(2)?,
1293 runtime: row.get(3)?,
1294 installed: row.get::<_, bool>(4)?,
1295 disabled: row.get::<_, bool>(5)?,
1296 status: row.get(6)?,
1297 installed_at_ms: row.get(7)?,
1298 last_error: row.get(8)?,
1299 })
1300}
1301
1302#[derive(Debug, Clone)]
1303struct CurrentChunk {
1304 id: i64,
1305 path: String,
1306 language: String,
1307 file_kind: String,
1308 chunk_kind: String,
1309 symbol_path: Option<String>,
1310 text: String,
1311 text_hash: String,
1312 embedding_status: Option<String>,
1313 source_text_hash: Option<String>,
1314 model_version: Option<String>,
1315 embedding_dim: Option<i64>,
1316 input_hash: Option<String>,
1317 embedding_text_version: Option<String>,
1318 next_retry_after_ms: Option<i64>,
1319 reason: ReconcileReason,
1320}
1321
1322#[derive(Debug)]
1323struct PreparedEmbeddingJob {
1324 id: i64,
1325 text_hash: String,
1326 input_text: String,
1327 input_hash: String,
1328 input_chars: usize,
1329 input_truncated: bool,
1330 policy: String,
1331 priority: i64,
1332 reason: ReconcileReason,
1333}
1334
1335struct SelectedBatch {
1336 jobs: Vec<PreparedEmbeddingJob>,
1337}
1338
1339fn current_chunks(conn: &Connection, limit: Option<u32>) -> anyhow::Result<Vec<CurrentChunk>> {
1340 embedding_job_candidates(conn, "", "", 0, limit, false)
1341}
1342
1343fn current_chunk_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<CurrentChunk> {
1344 Ok(CurrentChunk {
1345 id: row.get(0)?,
1346 path: row.get(1)?,
1347 language: row.get(2)?,
1348 file_kind: row.get(3)?,
1349 chunk_kind: row.get(4)?,
1350 symbol_path: row.get(5)?,
1351 text: row.get(6)?,
1352 text_hash: row.get(7)?,
1353 embedding_status: row.get(8)?,
1354 source_text_hash: row.get(9)?,
1355 model_version: row.get(10)?,
1356 embedding_dim: row.get(11)?,
1357 input_hash: row.get(12)?,
1358 embedding_text_version: row.get(13)?,
1359 next_retry_after_ms: row.get(14)?,
1360 reason: ReconcileReason::Forced,
1361 })
1362}
1363
1364fn embedding_job_candidates(
1365 conn: &Connection,
1366 model_id: &str,
1367 model_version: &str,
1368 dim: usize,
1369 limit: Option<u32>,
1370 changed_first: bool,
1371) -> anyhow::Result<Vec<CurrentChunk>> {
1372 let changed_order = if changed_first {
1373 "chunks.source_revision DESC,"
1374 } else {
1375 "chunks.embedding_priority ASC,"
1376 };
1377 let sql = format!(
1378 "
1379 SELECT chunks.id,
1380 files.path,
1381 files.language,
1382 files.kind,
1383 chunks.chunk_kind,
1384 chunks.symbol_path,
1385 chunks.text,
1386 chunks.text_hash,
1387 chunk_embeddings.status,
1388 chunk_embeddings.source_text_hash,
1389 chunk_embeddings.model_version,
1390 chunk_embeddings.embedding_dim,
1391 chunk_embeddings.input_hash,
1392 chunk_embeddings.embedding_text_version,
1393 chunk_embeddings.next_retry_after_ms
1394 FROM chunks
1395 JOIN files ON files.id = chunks.file_id
1396 LEFT JOIN chunk_embeddings
1397 ON chunk_embeddings.chunk_id = chunks.id
1398 AND chunk_embeddings.model_id = ?1
1399 WHERE chunks.text IS NOT NULL
1400 ORDER BY
1401 CASE
1402 WHEN chunk_embeddings.chunk_id IS NULL THEN 0
1403 WHEN chunk_embeddings.source_text_hash != chunks.text_hash THEN 1
1404 WHEN chunk_embeddings.status = 'Failed' THEN 2
1405 ELSE 3
1406 END,
1407 {changed_order}
1408 chunks.id
1409 LIMIT ?5
1410 "
1411 );
1412 let mut stmt = conn.prepare(&sql)?;
1413 let rows = stmt.query_map(
1414 params![
1415 model_id,
1416 model_version,
1417 i64::try_from(dim).unwrap_or(i64::MAX),
1418 now_ms(),
1419 limit.map(i64::from).unwrap_or(i64::MAX)
1420 ],
1421 current_chunk_row,
1422 )?;
1423 let mut chunks = collect_rows(rows)?;
1424 if !model_id.is_empty() {
1425 for chunk in &mut chunks {
1426 chunk.reason = ReconcileReason::Missing;
1427 }
1428 }
1429 Ok(chunks)
1430}
1431
1432fn embedding_candidate_ids(
1438 conn: &Connection,
1439 model_id: &str,
1440 changed_first: bool,
1441) -> anyhow::Result<Vec<i64>> {
1442 let changed_order = if changed_first {
1443 "chunks.source_revision DESC,"
1444 } else {
1445 "chunks.embedding_priority ASC,"
1446 };
1447 let sql = format!(
1448 "
1449 SELECT chunks.id
1450 FROM chunks
1451 JOIN files ON files.id = chunks.file_id
1452 LEFT JOIN chunk_embeddings
1453 ON chunk_embeddings.chunk_id = chunks.id
1454 AND chunk_embeddings.model_id = ?1
1455 WHERE chunks.text IS NOT NULL
1456 ORDER BY
1457 CASE
1458 WHEN chunk_embeddings.chunk_id IS NULL THEN 0
1459 WHEN chunk_embeddings.source_text_hash != chunks.text_hash THEN 1
1460 WHEN chunk_embeddings.status = 'Failed' THEN 2
1461 ELSE 3
1462 END,
1463 {changed_order}
1464 chunks.id
1465 "
1466 );
1467 let mut stmt = conn.prepare(&sql)?;
1468 let ids = stmt
1469 .query_map(params![model_id], |row| row.get::<_, i64>(0))?
1470 .collect::<Result<Vec<_>, _>>()?;
1471 Ok(ids)
1472}
1473
1474fn current_chunks_by_ids(
1477 conn: &Connection,
1478 model_id: &str,
1479 ids: &[i64],
1480) -> anyhow::Result<Vec<CurrentChunk>> {
1481 if ids.is_empty() {
1482 return Ok(Vec::new());
1483 }
1484 let placeholders = vec!["?"; ids.len()].join(",");
1485 let sql = format!(
1486 "
1487 SELECT chunks.id, files.path, files.language, files.kind, chunks.chunk_kind,
1488 chunks.symbol_path, chunks.text, chunks.text_hash,
1489 chunk_embeddings.status, chunk_embeddings.source_text_hash,
1490 chunk_embeddings.model_version, chunk_embeddings.embedding_dim,
1491 chunk_embeddings.input_hash, chunk_embeddings.embedding_text_version,
1492 chunk_embeddings.next_retry_after_ms
1493 FROM chunks
1494 JOIN files ON files.id = chunks.file_id
1495 LEFT JOIN chunk_embeddings
1496 ON chunk_embeddings.chunk_id = chunks.id
1497 AND chunk_embeddings.model_id = ?1
1498 WHERE chunks.id IN ({placeholders})
1499 "
1500 );
1501 let mut bind: Vec<Value> = Vec::with_capacity(ids.len() + 1);
1502 bind.push(Value::Text(model_id.to_string()));
1503 bind.extend(ids.iter().map(|id| Value::Integer(*id)));
1504 let mut stmt = conn.prepare(&sql)?;
1505 let rows = stmt.query_map(params_from_iter(bind), current_chunk_row)?;
1506 let mut by_id: std::collections::HashMap<i64, CurrentChunk> =
1507 collect_rows(rows)?.into_iter().map(|chunk| (chunk.id, chunk)).collect();
1508 if !model_id.is_empty() {
1511 for chunk in by_id.values_mut() {
1512 chunk.reason = ReconcileReason::Missing;
1513 }
1514 }
1515 Ok(ids.iter().filter_map(|id| by_id.remove(id)).collect())
1516}
1517
1518impl CurrentChunk {
1519 fn reason(
1520 &self,
1521 model_version: &str,
1522 dim: usize,
1523 now_ms: i64,
1524 _max_embedding_chars: usize,
1525 ) -> ReconcileReason {
1526 if self.reason == ReconcileReason::Forced {
1527 return ReconcileReason::Forced;
1528 }
1529 if self.embedding_status.is_none() {
1530 return ReconcileReason::Missing;
1531 }
1532 if self.source_text_hash.as_deref() != Some(self.text_hash.as_str()) {
1533 return ReconcileReason::SourceChanged;
1534 }
1535 if self.input_hash.as_deref().is_none_or(str::is_empty) {
1536 return ReconcileReason::InputChanged;
1537 }
1538 if self.model_version.as_deref() != Some(model_version)
1539 || self.embedding_text_version.as_deref() != Some(EMBEDDING_TEXT_VERSION)
1540 {
1541 return ReconcileReason::ModelChanged;
1542 }
1543 if self.embedding_dim != Some(i64::try_from(dim).unwrap_or(i64::MAX)) {
1544 return ReconcileReason::DimChanged;
1545 }
1546 if self.embedding_status.as_deref() == Some("Failed")
1547 && self.next_retry_after_ms.unwrap_or(0) <= now_ms
1548 {
1549 return ReconcileReason::RetryAfterFailure;
1550 }
1551 ReconcileReason::Missing
1552 }
1553}
1554
1555struct EmbeddingScan<'a> {
1559 model_id: &'a str,
1560 model_version: &'a str,
1561 dim: usize,
1562 max_embedding_chars: usize,
1563}
1564
1565fn estimated_reconcile_jobs(
1566 conn: &Connection,
1567 scan: &EmbeddingScan<'_>,
1568 options: &ReconcileOptions,
1569) -> anyhow::Result<u64> {
1570 let candidates = if options.force {
1571 current_chunks(conn, options.limit)?
1572 } else {
1573 embedding_job_candidates(
1574 conn,
1575 scan.model_id,
1576 scan.model_version,
1577 scan.dim,
1578 options.limit,
1579 options.changed_first,
1580 )?
1581 };
1582 let count = candidates
1583 .iter()
1584 .filter(|candidate| {
1585 policy_for_job(candidate, scan.max_embedding_chars).eligible
1586 && (options.force
1587 || needs_embedding(
1588 candidate,
1589 scan.model_id,
1590 scan.model_version,
1591 scan.dim,
1592 scan.max_embedding_chars,
1593 ))
1594 })
1595 .count();
1596 Ok(u64::try_from(count).unwrap_or(u64::MAX))
1597}
1598
1599fn select_reconcile_batch(
1603 conn: &Connection,
1604 scan: &EmbeddingScan<'_>,
1605 ids: &[i64],
1606 options: &ReconcileOptions,
1607) -> anyhow::Result<SelectedBatch> {
1608 let model_id = if options.force { "" } else { scan.model_id };
1611 let candidates = current_chunks_by_ids(conn, model_id, ids)?;
1612 let mut jobs = Vec::new();
1613 for candidate in candidates {
1614 let policy = policy_for_job(&candidate, scan.max_embedding_chars);
1615 if !policy.eligible {
1616 continue;
1617 }
1618 if !options.force
1619 && !needs_embedding(
1620 &candidate,
1621 scan.model_id,
1622 scan.model_version,
1623 scan.dim,
1624 scan.max_embedding_chars,
1625 )
1626 {
1627 continue;
1628 }
1629 let input = build_embedding_input(&candidate, scan.max_embedding_chars);
1630 let reason = if options.force {
1631 ReconcileReason::Forced
1632 } else {
1633 candidate.reason(scan.model_version, scan.dim, now_ms(), scan.max_embedding_chars)
1634 };
1635 jobs.push(PreparedEmbeddingJob {
1636 id: candidate.id,
1637 text_hash: candidate.text_hash,
1638 input_hash: embedding_input_hash(scan.model_id, scan.model_version, &input.text),
1639 input_chars: input.chars,
1640 input_truncated: input.truncated,
1641 input_text: input.text,
1642 policy: policy.policy,
1643 priority: policy.priority,
1644 reason,
1645 });
1646 }
1647 Ok(SelectedBatch { jobs })
1648}
1649
1650fn needs_embedding(
1651 chunk: &CurrentChunk,
1652 model_id: &str,
1653 model_version: &str,
1654 dim: usize,
1655 max_embedding_chars: usize,
1656) -> bool {
1657 let input = build_embedding_input(chunk, max_embedding_chars);
1658 let expected_input_hash = embedding_input_hash(model_id, model_version, &input.text);
1659 chunk.embedding_status.as_deref() != Some("Current")
1660 || chunk.source_text_hash.as_deref() != Some(chunk.text_hash.as_str())
1661 || chunk.model_version.as_deref() != Some(model_version)
1662 || chunk.embedding_dim != Some(i64::try_from(dim).unwrap_or(i64::MAX))
1663 || chunk.input_hash.as_deref() != Some(expected_input_hash.as_str())
1664 || chunk.embedding_text_version.as_deref() != Some(EMBEDDING_TEXT_VERSION)
1665}
1666
1667pub fn embedding_policy_for_chunk(
1668 path: &Path,
1669 language: &str,
1670 file_kind: &str,
1671 chunk_kind: &str,
1672 symbol_path: Option<&str>,
1673 text: &str,
1674 max_embedding_chars: usize,
1675) -> EmbeddingPolicyDecision {
1676 let path_text = path.to_string_lossy();
1677 let trimmed = text.trim();
1678 if trimmed.chars().count() > max_embedding_chars.saturating_mul(4)
1679 && (file_kind == "generated" || chunk_kind == "generated" || symbol_path.is_none())
1680 {
1681 return policy("SkipTooLarge", 9, false);
1682 }
1683 if file_kind == "generated" || chunk_kind == "generated" || looks_generated_path(&path_text) {
1684 return policy("SkipGenerated", 9, false);
1685 }
1686 if is_test_fixture_path(&path_text) {
1687 return policy("SkipTestFixture", 9, false);
1688 }
1689 let Ok(language_kind) = language.parse::<Language>() else {
1690 return policy("SkipLanguageUnsupported", 9, false);
1691 };
1692 if !language_kind.supports_embeddings() {
1693 return policy("SkipLanguageUnsupported", 9, false);
1694 }
1695 if trimmed.chars().count() < MIN_EMBEDDING_CHARS {
1696 return policy("SkipTooSmall", 9, false);
1697 }
1698 if is_low_signal_chunk(language, chunk_kind, symbol_path, trimmed) {
1699 return policy("SkipLowSignal", 9, false);
1700 }
1701 policy("Embed", embedding_priority(&path_text, language, chunk_kind, symbol_path), true)
1702}
1703
1704fn policy(name: &str, priority: i64, eligible: bool) -> EmbeddingPolicyDecision {
1705 EmbeddingPolicyDecision { policy: name.to_string(), priority, eligible }
1706}
1707
1708fn policy_for_job(chunk: &CurrentChunk, max_embedding_chars: usize) -> EmbeddingPolicyDecision {
1709 embedding_policy_for_chunk(
1710 Path::new(&chunk.path),
1711 &chunk.language,
1712 &chunk.file_kind,
1713 &chunk.chunk_kind,
1714 chunk.symbol_path.as_deref(),
1715 &chunk.text,
1716 max_embedding_chars,
1717 )
1718}
1719
1720fn embedding_priority(
1721 path: &str,
1722 language: &str,
1723 chunk_kind: &str,
1724 symbol_path: Option<&str>,
1725) -> i64 {
1726 if symbol_path.is_some()
1727 && matches!(chunk_kind, "code")
1728 && !is_test_path(path)
1729 && language != "markdown"
1730 {
1731 return 0;
1732 }
1733 if language == "markdown" {
1734 return 1;
1735 }
1736 if is_test_path(path) {
1737 return 2;
1738 }
1739 1
1740}
1741
1742fn priority_label(priority: i64) -> &'static str {
1743 match priority {
1744 0 => "source_symbols",
1745 1 => "source_or_docs",
1746 2 => "tests",
1747 3 => "low_signal",
1748 9 => "skipped",
1749 _ => "other",
1750 }
1751}
1752
1753fn looks_generated_path(path: &str) -> bool {
1754 path.contains("/generated/")
1755 || path.contains("/src/generated/")
1756 || path.contains("/target/")
1757 || path.ends_with("Cargo.lock")
1758 || path.ends_with("package-lock.json")
1759 || path.ends_with("pnpm-lock.yaml")
1760}
1761
1762fn is_test_path(path: &str) -> bool {
1763 path.contains("/tests/")
1764 || path.contains("/test/")
1765 || path.contains("__tests__")
1766 || path.ends_with("_test.rs")
1767 || path.ends_with(".test.ts")
1768 || path.ends_with(".spec.ts")
1769 || path.ends_with(".test.tsx")
1770 || path.ends_with(".spec.tsx")
1771}
1772
1773fn is_test_fixture_path(path: &str) -> bool {
1774 path.contains("/fixtures/")
1775 || path.contains("/__fixtures__/")
1776 || path.contains("/testdata/")
1777 || path.contains("/snapshots/")
1778 || path.ends_with(".snap")
1779}
1780
1781fn is_low_signal_chunk(
1782 language: &str,
1783 chunk_kind: &str,
1784 symbol_path: Option<&str>,
1785 text: &str,
1786) -> bool {
1787 if language == "markdown" {
1788 return false;
1789 }
1790 let lines = text
1791 .lines()
1792 .map(str::trim)
1793 .filter(|line| !line.is_empty() && !line.starts_with("//") && !line.starts_with("/*"))
1794 .collect::<Vec<_>>();
1795 if lines.is_empty() {
1796 return true;
1797 }
1798 if symbol_path.is_none() && chunk_kind == "code" && lines.len() <= 3 {
1799 return true;
1800 }
1801 lines.iter().all(|line| {
1802 line.starts_with("use ")
1803 || line.starts_with("pub use ")
1804 || line.starts_with("import ")
1805 || line.starts_with("export ")
1806 || line.starts_with("mod ")
1807 || line.starts_with("pub mod ")
1808 || *line == "}"
1809 || *line == "{"
1810 })
1811}
1812
1813struct EmbeddingInput {
1814 text: String,
1815 chars: usize,
1816 truncated: bool,
1817}
1818
1819fn build_embedding_input(chunk: &CurrentChunk, max_chars: usize) -> EmbeddingInput {
1820 let mut input = String::new();
1821 input.push_str("path: ");
1822 input.push_str(&chunk.path);
1823 input.push('\n');
1824 input.push_str("language: ");
1825 input.push_str(&chunk.language);
1826 input.push('\n');
1827 input.push_str("kind: ");
1828 input.push_str(&chunk.chunk_kind);
1829 input.push('\n');
1830 if let Some(symbol_path) = &chunk.symbol_path {
1831 input.push_str("symbol: ");
1832 input.push_str(symbol_path);
1833 input.push('\n');
1834 }
1835 input.push_str("body:\n");
1836 let prefix_chars = input.chars().count();
1837 let budget = max_chars.saturating_sub(prefix_chars).max(MIN_EMBEDDING_CHARS);
1838 let (body, truncated) = truncate_chars(&chunk.text, budget);
1839 input.push_str(&body);
1840 let chars = input.chars().count();
1841 EmbeddingInput { text: input, chars, truncated }
1842}
1843
1844fn truncate_chars(text: &str, max_chars: usize) -> (String, bool) {
1845 if text.chars().count() <= max_chars {
1846 return (text.to_string(), false);
1847 }
1848 (text.chars().take(max_chars).collect(), true)
1849}
1850
1851fn embedding_input_hash(model_id: &str, model_version: &str, input: &str) -> String {
1852 let mut hasher = Sha256::new();
1853 hasher.update(model_id.as_bytes());
1854 hasher.update(b"\0");
1855 hasher.update(model_version.as_bytes());
1856 hasher.update(b"\0");
1857 hasher.update(EMBEDDING_TEXT_VERSION.as_bytes());
1858 hasher.update(b"\0");
1859 hasher.update(input.as_bytes());
1860 let hash = hasher.finalize();
1861 let mut out = String::with_capacity(hash.len() * 2);
1862 for byte in hash {
1863 use std::fmt::Write as _;
1864 let _ = write!(out, "{byte:02x}");
1865 }
1866 out
1867}
1868
1869fn write_current_embedding_batch(
1870 conn: &Connection,
1871 embedder: &dyn Embedder,
1872 model_version: &str,
1873 batch: &[PreparedEmbeddingJob],
1874 vectors: &[Vec<f32>],
1875) -> anyhow::Result<()> {
1876 conn.execute_batch("BEGIN IMMEDIATE")?;
1877 let write_result = (|| {
1878 for (chunk, vector) in batch.iter().zip(vectors) {
1879 store_embedding(conn, embedder, model_version, chunk, vector)?;
1880 }
1881 Ok(())
1882 })();
1883 finish_batch_transaction(conn, write_result)
1884}
1885
1886fn write_failed_embedding_batch(
1887 conn: &Connection,
1888 embedder: &dyn Embedder,
1889 model_version: &str,
1890 batch: &[PreparedEmbeddingJob],
1891 error: &str,
1892) -> anyhow::Result<()> {
1893 conn.execute_batch("BEGIN IMMEDIATE")?;
1894 let write_result = (|| {
1895 for chunk in batch {
1896 store_failed_embedding(conn, embedder, model_version, chunk, error)?;
1897 }
1898 Ok(())
1899 })();
1900 finish_batch_transaction(conn, write_result)
1901}
1902
1903fn finish_batch_transaction(conn: &Connection, result: anyhow::Result<()>) -> anyhow::Result<()> {
1904 match result {
1905 Ok(()) => {
1906 conn.execute_batch("COMMIT")?;
1907 Ok(())
1908 },
1909 Err(err) => {
1910 let _ = conn.execute_batch("ROLLBACK");
1911 Err(err)
1912 },
1913 }
1914}
1915
1916fn store_embedding(
1917 conn: &Connection,
1918 embedder: &dyn Embedder,
1919 model_version: &str,
1920 chunk: &PreparedEmbeddingJob,
1921 vector: &[f32],
1922) -> anyhow::Result<()> {
1923 if vector.len() != embedder.dim() {
1924 anyhow::bail!(
1925 "embedding dimension mismatch for {}: got {}, expected {}",
1926 embedder.model_id(),
1927 vector.len(),
1928 embedder.dim()
1929 );
1930 }
1931 conn.execute(
1932 "
1933 INSERT INTO chunk_embeddings(
1934 chunk_id, model_id, model_version, source_text_hash, input_hash,
1935 embedding_text_version, embedding_policy, embedding_priority, input_chars,
1936 input_truncated, embedding_dim, vector_blob,
1937 status, attempt_count, last_error_class, next_retry_after_ms, computed_at_ms,
1938 created_at_ms, last_error
1939 )
1940 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, 'Current', 1, NULL, NULL, ?13, ?13, NULL)
1941 ON CONFLICT(chunk_id, model_id) DO UPDATE SET
1942 model_version = excluded.model_version,
1943 source_text_hash = excluded.source_text_hash,
1944 input_hash = excluded.input_hash,
1945 embedding_text_version = excluded.embedding_text_version,
1946 embedding_policy = excluded.embedding_policy,
1947 embedding_priority = excluded.embedding_priority,
1948 input_chars = excluded.input_chars,
1949 input_truncated = excluded.input_truncated,
1950 embedding_dim = excluded.embedding_dim,
1951 vector_blob = excluded.vector_blob,
1952 status = excluded.status,
1953 attempt_count = chunk_embeddings.attempt_count + 1,
1954 last_error_class = NULL,
1955 next_retry_after_ms = NULL,
1956 computed_at_ms = excluded.computed_at_ms,
1957 created_at_ms = excluded.created_at_ms,
1958 last_error = NULL
1959 ",
1960 params![
1961 chunk.id,
1962 embedder.model_id(),
1963 model_version,
1964 chunk.text_hash,
1965 chunk.input_hash,
1966 EMBEDDING_TEXT_VERSION,
1967 chunk.policy,
1968 chunk.priority,
1969 i64::try_from(chunk.input_chars).unwrap_or(i64::MAX),
1970 chunk.input_truncated,
1971 i64::try_from(embedder.dim()).unwrap_or(i64::MAX),
1972 encode_vector(vector),
1973 now_ms()
1974 ],
1975 )?;
1976 Ok(())
1977}
1978
1979fn store_failed_embedding(
1980 conn: &Connection,
1981 embedder: &dyn Embedder,
1982 model_version: &str,
1983 chunk: &PreparedEmbeddingJob,
1984 error: &str,
1985) -> anyhow::Result<()> {
1986 let retry_at = now_ms().saturating_add(60_000);
1987 conn.execute(
1988 "
1989 INSERT INTO chunk_embeddings(
1990 chunk_id, model_id, model_version, source_text_hash, input_hash,
1991 embedding_text_version, embedding_policy, embedding_priority, input_chars,
1992 input_truncated, embedding_dim, vector_blob,
1993 status, attempt_count, last_error_class, next_retry_after_ms, computed_at_ms,
1994 created_at_ms, last_error
1995 )
1996 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, x'', 'Failed', 1, ?12, ?13, NULL, ?14, ?15)
1997 ON CONFLICT(chunk_id, model_id) DO UPDATE SET
1998 model_version = excluded.model_version,
1999 source_text_hash = excluded.source_text_hash,
2000 input_hash = excluded.input_hash,
2001 embedding_text_version = excluded.embedding_text_version,
2002 embedding_policy = excluded.embedding_policy,
2003 embedding_priority = excluded.embedding_priority,
2004 input_chars = excluded.input_chars,
2005 input_truncated = excluded.input_truncated,
2006 embedding_dim = excluded.embedding_dim,
2007 vector_blob = x'',
2008 status = 'Failed',
2009 attempt_count = chunk_embeddings.attempt_count + 1,
2010 last_error_class = excluded.last_error_class,
2011 next_retry_after_ms = excluded.next_retry_after_ms,
2012 computed_at_ms = NULL,
2013 created_at_ms = excluded.created_at_ms,
2014 last_error = excluded.last_error
2015 ",
2016 params![
2017 chunk.id,
2018 embedder.model_id(),
2019 model_version,
2020 chunk.text_hash,
2021 chunk.input_hash,
2022 EMBEDDING_TEXT_VERSION,
2023 chunk.policy,
2024 chunk.priority,
2025 i64::try_from(chunk.input_chars).unwrap_or(i64::MAX),
2026 chunk.input_truncated,
2027 i64::try_from(embedder.dim()).unwrap_or(i64::MAX),
2028 "Transient",
2029 retry_at,
2030 now_ms(),
2031 error
2032 ],
2033 )?;
2034 Ok(())
2035}
2036
2037pub struct QueryEmbedding {
2038 pub model_id: String,
2039 pub dim: usize,
2040 pub vector: Vec<f32>,
2041}
2042
2043pub fn embed_query(conn: &Connection, query: &str) -> anyhow::Result<Option<QueryEmbedding>> {
2044 ensure_model_manifest(conn)?;
2045 let Ok(embedder) = active_embedder(conn, None) else {
2046 return Ok(None);
2047 };
2048 embed_query_with(&*embedder, query).map(Some)
2049}
2050
2051pub fn hash_query_embedding(query: &str) -> anyhow::Result<QueryEmbedding> {
2052 embed_query_with(&HashEmbedder, query)
2053}
2054
2055fn embed_query_with(embedder: &dyn Embedder, query: &str) -> anyhow::Result<QueryEmbedding> {
2056 let texts = vec![query.to_string()];
2057 let mut vectors = embedder.embed_batch(&texts)?;
2058 let Some(vector) = vectors.pop() else {
2059 anyhow::bail!("embedder {} returned no query vector", embedder.model_id());
2060 };
2061 if vector.len() != embedder.dim() {
2062 anyhow::bail!(
2063 "embedder {} returned query dimension {}, expected {}",
2064 embedder.model_id(),
2065 vector.len(),
2066 embedder.dim()
2067 );
2068 }
2069 Ok(QueryEmbedding { model_id: embedder.model_id().to_string(), dim: embedder.dim(), vector })
2070}
2071
2072pub fn active_embedding_model_id(conn: &Connection) -> anyhow::Result<String> {
2073 ensure_model_manifest(conn)?;
2074 if let Some(model_id) = meta(conn, ACTIVE_EMBEDDING_MODEL_META)? {
2075 return Ok(model_id);
2076 }
2077 Ok(HASH_MODEL_ID.to_string())
2078}
2079
2080pub fn active_embedding_model_version(conn: &Connection, model_id: &str) -> anyhow::Result<String> {
2081 if let Some(version) = reconcile_meta(conn, ACTIVE_EMBEDDING_MODEL_VERSION_META)? {
2082 return Ok(version);
2083 }
2084 Ok(default_model_version(model_id).to_string())
2085}
2086
2087fn default_model_version(model_id: &str) -> &'static str {
2088 match model_id {
2089 HASH_MODEL_ID => "hash-v1",
2090 FASTEMBED_MODEL_ID => "fastembed-all-minilm-l6-v2-v1",
2091 MODEL2VEC_MODEL_ID => "model2vec-potion-retrieval-32m-v1",
2092 _ => "v1",
2093 }
2094}
2095
2096pub fn current_embedding_count(conn: &Connection, model_id: &str) -> anyhow::Result<u64> {
2097 ensure_model_manifest(conn)?;
2098 let model_version = active_embedding_model_version(conn, model_id)?;
2099 let count: i64 = conn.query_row(
2100 "
2101 SELECT COUNT(*)
2102 FROM chunk_embeddings
2103 JOIN chunks ON chunks.id = chunk_embeddings.chunk_id
2104 JOIN ai_models ON ai_models.model_id = chunk_embeddings.model_id
2105 WHERE chunk_embeddings.model_id = ?1
2106 AND ai_models.installed = 1
2107 AND ai_models.disabled = 0
2108 AND ai_models.status = 'Ready'
2109 AND chunk_embeddings.embedding_dim = ai_models.embedding_dim
2110 AND chunk_embeddings.status = 'Current'
2111 AND chunk_embeddings.source_text_hash = chunks.text_hash
2112 AND chunk_embeddings.model_version = ?2
2113 AND chunk_embeddings.embedding_text_version = ?3
2114 AND chunk_embeddings.input_hash != ''
2115 ",
2116 params![model_id, model_version, EMBEDDING_TEXT_VERSION],
2117 |row| row.get(0),
2118 )?;
2119 Ok(u64::try_from(count).unwrap_or(0))
2120}
2121
2122fn active_embedder(
2123 conn: &Connection,
2124 intra_threads: Option<usize>,
2125) -> anyhow::Result<Box<dyn Embedder>> {
2126 let model_id = active_embedding_model_id(conn)?;
2127 let model = model(conn, &model_id)?;
2128 validate_ready_model(&model)?;
2129 match model.model_id.as_str() {
2130 HASH_MODEL_ID => Ok(Box::new(HashEmbedder)),
2131 FASTEMBED_MODEL_ID => fastembed_embedder(intra_threads),
2132 MODEL2VEC_MODEL_ID => model2vec_embedder(),
2133 other => anyhow::bail!("unknown active embedding model `{other}`"),
2134 }
2135}
2136
2137fn model2vec_embedder() -> anyhow::Result<Box<dyn Embedder>> {
2138 #[cfg(feature = "model2vec")]
2139 {
2140 Ok(Box::new(Model2VecEmbedder::new()?))
2141 }
2142 #[cfg(not(feature = "model2vec"))]
2143 {
2144 anyhow::bail!("{}", MODEL2VEC_MISSING_FEATURE_MESSAGE)
2145 }
2146}
2147
2148fn validate_ready_model(model: &ModelInfo) -> anyhow::Result<()> {
2149 if model.disabled {
2150 anyhow::bail!("model {} is disabled", model.model_id);
2151 }
2152 if !model.installed || model.status != "Ready" {
2153 anyhow::bail!("{}", model_not_ready_reason(model));
2154 }
2155 let expected_dim = expected_dim(&model.model_id)
2156 .ok_or_else(|| anyhow::anyhow!("unknown embedding model `{}`", model.model_id))?;
2157 if model.embedding_dim != Some(i64::try_from(expected_dim).unwrap_or(i64::MAX)) {
2158 anyhow::bail!(
2159 "model {} dimension mismatch: manifest has {:?}, expected {}",
2160 model.model_id,
2161 model.embedding_dim,
2162 expected_dim
2163 );
2164 }
2165 Ok(())
2166}
2167
2168fn model_not_ready_reason(model: &ModelInfo) -> String {
2169 if model.disabled {
2170 "Disabled".to_string()
2171 } else if let Some(last_error) = &model.last_error {
2172 last_error.clone()
2173 } else if !model.installed {
2174 "MissingModel".to_string()
2175 } else {
2176 model.status.clone()
2177 }
2178}
2179
2180fn expected_dim(model_id: &str) -> Option<usize> {
2181 match model_id {
2182 HASH_MODEL_ID => Some(HASH_EMBEDDING_DIM),
2183 FASTEMBED_MODEL_ID => Some(FASTEMBED_EMBEDDING_DIM),
2184 MODEL2VEC_MODEL_ID => Some(MODEL2VEC_EMBEDDING_DIM),
2185 _ => None,
2186 }
2187}
2188
2189fn fastembed_embedder(intra_threads: Option<usize>) -> anyhow::Result<Box<dyn Embedder>> {
2190 #[cfg(feature = "fastembed")]
2191 {
2192 Ok(Box::new(FastEmbedEmbedder::new(intra_threads)?))
2193 }
2194 #[cfg(not(feature = "fastembed"))]
2195 {
2196 let _ = intra_threads;
2197 anyhow::bail!("{}", FASTEMBED_MISSING_FEATURE_MESSAGE)
2198 }
2199}
2200
2201pub fn fastembed_cache_dir() -> PathBuf {
2202 if let Ok(cache) = std::env::var("RAG_RAT_MODEL_CACHE") {
2203 return PathBuf::from(cache);
2204 }
2205 if let Ok(cache) = std::env::var("XDG_CACHE_HOME") {
2206 return PathBuf::from(cache).join("rag-rat").join("models");
2207 }
2208 if let Ok(home) = std::env::var("HOME") {
2209 return PathBuf::from(home).join(".cache").join("rag-rat").join("models");
2210 }
2211 PathBuf::from(".rag-rat").join("models")
2212}
2213
2214pub fn decode_vector(blob: &[u8], dim: usize) -> Option<Vec<f32>> {
2215 if blob.len() != dim.checked_mul(4)? {
2216 return None;
2217 }
2218 let mut out = Vec::with_capacity(dim);
2219 for bytes in blob.chunks_exact(4) {
2220 out.push(f32::from_le_bytes(bytes.try_into().ok()?));
2221 }
2222 Some(out)
2223}
2224
2225fn encode_vector(vector: &[f32]) -> Vec<u8> {
2226 let mut out = Vec::with_capacity(vector.len() * 4);
2227 for value in vector {
2228 out.extend_from_slice(&value.to_le_bytes());
2229 }
2230 out
2231}
2232
2233fn hash_embed_text(text: &str, dim: usize) -> Vec<f32> {
2234 let mut vector = vec![0.0_f32; dim];
2235 let tokens = tokens(text);
2236 for token in &tokens {
2237 add_feature(&mut vector, token, 1.0);
2238 }
2239 for pair in tokens.windows(2) {
2240 add_feature(&mut vector, &format!("{}::{}", pair[0], pair[1]), 0.6);
2241 }
2242 normalize(&mut vector);
2243 vector
2244}
2245
2246fn tokens(text: &str) -> Vec<String> {
2247 text.split(|ch: char| !ch.is_alphanumeric() && ch != '_')
2248 .filter(|part| !part.is_empty())
2249 .flat_map(split_identifier)
2250 .filter(|part| part.len() > 1)
2251 .collect()
2252}
2253
2254fn split_identifier(value: &str) -> Vec<String> {
2255 let mut parts = Vec::new();
2256 let mut current = String::new();
2257 let mut previous_lower = false;
2258 for ch in value.chars() {
2259 if ch == '_' || ch == '-' {
2260 if !current.is_empty() {
2261 parts.push(current.to_ascii_lowercase());
2262 current.clear();
2263 }
2264 previous_lower = false;
2265 continue;
2266 }
2267 if previous_lower && ch.is_uppercase() && !current.is_empty() {
2268 parts.push(current.to_ascii_lowercase());
2269 current.clear();
2270 }
2271 previous_lower = ch.is_lowercase() || ch.is_ascii_digit();
2272 current.push(ch);
2273 }
2274 if !current.is_empty() {
2275 parts.push(current.to_ascii_lowercase());
2276 }
2277 parts
2278}
2279
2280fn add_feature(vector: &mut [f32], feature: &str, weight: f32) {
2281 let digest = Sha256::digest(feature.as_bytes());
2282 let index = u16::from_le_bytes([digest[0], digest[1]]) as usize % vector.len();
2283 let sign = if digest[2] & 1 == 0 { 1.0 } else { -1.0 };
2284 vector[index] += sign * weight;
2285}
2286
2287fn normalize(vector: &mut [f32]) {
2288 let norm = vector.iter().map(|value| value * value).sum::<f32>().sqrt();
2289 if norm > 0.0 {
2290 for value in vector {
2291 *value /= norm;
2292 }
2293 }
2294}
2295
2296fn chunk_count(conn: &Connection) -> anyhow::Result<u64> {
2297 let count = conn.query_row(
2301 "SELECT COUNT(*) FROM chunks JOIN files ON files.id = chunks.file_id",
2302 [],
2303 |row| row.get::<_, i64>(0),
2304 )?;
2305 Ok(u64::try_from(count).unwrap_or(0))
2306}
2307
2308fn current_artifact_count(
2309 conn: &Connection,
2310 capability: &str,
2311 model_id: &str,
2312) -> anyhow::Result<u64> {
2313 let model_version = active_embedding_model_version(conn, model_id)?;
2314 let sql = artifact_table_sql(
2315 capability,
2316 "
2317 SELECT COUNT(*)
2318 FROM {table}
2319 JOIN chunks ON chunks.id = {table}.chunk_id
2320 JOIN files ON files.id = chunks.file_id
2321 JOIN ai_models ON ai_models.model_id = {table}.model_id
2322 WHERE {table}.model_id = ?1
2323 AND {table}.status = 'Current'
2324 AND {table}.source_text_hash = chunks.text_hash
2325 AND {table}.model_version = ?2
2326 AND {table}.embedding_text_version = ?3
2327 AND {table}.input_hash != ''
2328 AND {table}.embedding_dim = ai_models.embedding_dim
2329 ",
2330 );
2331 count_query3(conn, &sql, model_id, &model_version, EMBEDDING_TEXT_VERSION)
2332}
2333
2334fn stale_artifact_count(
2335 conn: &Connection,
2336 capability: &str,
2337 model_id: &str,
2338) -> anyhow::Result<u64> {
2339 let model_version = active_embedding_model_version(conn, model_id)?;
2340 let sql = artifact_table_sql(
2341 capability,
2342 "
2343 SELECT COUNT(*)
2344 FROM {table}
2345 JOIN chunks ON chunks.id = {table}.chunk_id
2346 JOIN files ON files.id = chunks.file_id
2347 JOIN ai_models ON ai_models.model_id = {table}.model_id
2348 WHERE {table}.model_id = ?1
2349 AND (
2350 {table}.source_text_hash != chunks.text_hash
2351 OR {table}.model_version != ?2
2352 OR {table}.embedding_text_version != ?3
2353 OR {table}.input_hash = ''
2354 OR {table}.embedding_dim != ai_models.embedding_dim
2355 OR {table}.status = 'Stale'
2356 )
2357 ",
2358 );
2359 count_query3(conn, &sql, model_id, &model_version, EMBEDDING_TEXT_VERSION)
2360}
2361
2362fn status_artifact_count(
2363 conn: &Connection,
2364 capability: &str,
2365 model_id: &str,
2366 status: ArtifactStatus,
2367) -> anyhow::Result<u64> {
2368 let sql = artifact_table_sql(
2369 capability,
2370 "
2371 SELECT COUNT(*)
2372 FROM {table}
2373 JOIN chunks ON chunks.id = {table}.chunk_id
2374 JOIN files ON files.id = chunks.file_id
2375 WHERE {table}.model_id = ?1 AND {table}.status = ?2
2376 ",
2377 );
2378 let count =
2379 conn.query_row(&sql, params![model_id, status.as_str()], |row| row.get::<_, i64>(0))?;
2380 Ok(u64::try_from(count).unwrap_or(0))
2381}
2382
2383fn count_query3(
2384 conn: &Connection,
2385 sql: &str,
2386 model_id: &str,
2387 left: &str,
2388 right: &str,
2389) -> anyhow::Result<u64> {
2390 let count = conn.query_row(sql, params![model_id, left, right], |row| row.get::<_, i64>(0))?;
2391 Ok(u64::try_from(count).unwrap_or(0))
2392}
2393
2394fn artifact_table_sql(_capability: &str, template: &str) -> String {
2395 let table = "chunk_embeddings";
2396 template.replace("{table}", table)
2397}
2398
2399fn set_meta(conn: &Connection, key: &str, value: &str) -> anyhow::Result<()> {
2400 conn.execute(
2401 "INSERT INTO index_meta(key, value) VALUES (?1, ?2)
2402 ON CONFLICT(key) DO UPDATE SET value = excluded.value",
2403 params![key, value],
2404 )?;
2405 Ok(())
2406}
2407
2408fn meta(conn: &Connection, key: &str) -> anyhow::Result<Option<String>> {
2409 Ok(conn
2410 .query_row("SELECT value FROM index_meta WHERE key = ?1", [key], |row| row.get(0))
2411 .optional()?)
2412}
2413
2414fn set_reconcile_meta(conn: &Connection, key: &str, value: &str) -> anyhow::Result<()> {
2415 conn.execute(
2416 "INSERT INTO reconcile_meta(key, value) VALUES (?1, ?2)
2417 ON CONFLICT(key) DO UPDATE SET value = excluded.value",
2418 params![key, value],
2419 )?;
2420 Ok(())
2421}
2422
2423fn reconcile_meta(conn: &Connection, key: &str) -> anyhow::Result<Option<String>> {
2424 Ok(conn
2425 .query_row("SELECT value FROM reconcile_meta WHERE key = ?1", [key], |row| row.get(0))
2426 .optional()?)
2427}
2428
2429fn collect_rows<T>(
2430 rows: rusqlite::MappedRows<'_, impl FnMut(&rusqlite::Row<'_>) -> rusqlite::Result<T>>,
2431) -> anyhow::Result<Vec<T>> {
2432 let mut out = Vec::new();
2433 for row in rows {
2434 out.push(row?);
2435 }
2436 Ok(out)
2437}
2438
2439fn find_existing_embedding(
2440 conn: &Connection,
2441 model_id: &str,
2442 input_hash: &str,
2443 dim: usize,
2444) -> anyhow::Result<Option<Vec<f32>>> {
2445 let vector: Option<Vec<u8>> = conn
2446 .query_row(
2447 "SELECT vector_blob FROM chunk_embeddings
2448 WHERE model_id = ?1 AND input_hash = ?2 AND status = 'Current' AND embedding_dim = ?3
2449 LIMIT 1",
2450 params![model_id, input_hash, i64::try_from(dim).unwrap_or(i64::MAX)],
2451 |row| row.get(0),
2452 )
2453 .optional()?;
2454 if let Some(blob) = vector { Ok(decode_vector(&blob, dim)) } else { Ok(None) }
2455}