Skip to main content

rag_rat_core/index/
ai.rs

1use std::{
2    collections::{BTreeMap, HashSet},
3    path::{Path, PathBuf},
4    time::Instant,
5};
6
7use rusqlite::{Connection, OptionalExtension, params, params_from_iter, types::Value};
8use serde::Serialize;
9use sha2::{Digest, Sha256};
10
11use crate::{index::now_ms, language::Language};
12
13pub const HASH_MODEL_ID: &str = "embedding-hash";
14pub const FASTEMBED_MODEL_ID: &str = "fastembed-all-minilm-l6-v2";
15pub const FASTEMBED_DISPLAY_MODEL: &str = "sentence-transformers/all-MiniLM-L6-v2";
16pub const HASH_EMBEDDING_DIM: usize = 384;
17pub const FASTEMBED_EMBEDDING_DIM: usize = 384;
18/// Model2Vec static-embedding backend: a token→vector lookup + mean-pool (no transformer forward
19/// pass), ~100-500× faster than FastEmbed on CPU at some retrieval-quality cost. The right choice
20/// for very large repos where the FastEmbed backfill is infeasible. See `EmbeddingBackend`.
21pub const MODEL2VEC_MODEL_ID: &str = "model2vec-potion-retrieval-32m";
22pub const MODEL2VEC_DISPLAY_MODEL: &str = "minishlab/potion-retrieval-32M";
23pub const MODEL2VEC_HF_REPO: &str = "minishlab/potion-retrieval-32M";
24pub const MODEL2VEC_EMBEDDING_DIM: usize = 512;
25pub const MODEL2VEC_MISSING_FEATURE_MESSAGE: &str = "Model2Vec backend requested, but this binary was built without Model2Vec support.\nRebuild with default features enabled:\n  cargo install rag-rat";
26pub const FASTEMBED_MISSING_FEATURE_MESSAGE: &str = "FastEmbed backend requested, but this binary was built without default FastEmbed support.\nRebuild with default features enabled:\n  cargo install rag-rat";
27const ACTIVE_EMBEDDING_MODEL_META: &str = "active_embedding_model";
28const ACTIVE_EMBEDDING_MODEL_VERSION_META: &str = "embedding_active_model_version";
29const LAST_EMBEDDING_RECONCILE_STARTED_META: &str = "last_embedding_reconcile_started_at_ms";
30const LAST_EMBEDDING_RECONCILE_FINISHED_META: &str = "last_embedding_reconcile_finished_at_ms";
31const DEFAULT_BATCH_SIZE: usize = 64;
32pub const DEFAULT_MAX_EMBEDDING_CHARS: usize = 4_000;
33const MIN_EMBEDDING_CHARS: usize = 80;
34pub const EMBEDDING_TEXT_VERSION: &str = "embedding-text-v2";
35const LEGACY_MODEL_IDS: &[&str] = &["embedding-small"];
36#[cfg(feature = "fastembed")]
37const FASTEMBED_HF_CACHE_REPO_DIR: &str = "models--Qdrant--all-MiniLM-L6-v2-onnx";
38
39pub trait Embedder {
40    fn model_id(&self) -> &str;
41    fn dim(&self) -> usize;
42    fn embed_batch(&self, texts: &[String]) -> anyhow::Result<Vec<Vec<f32>>>;
43}
44
45pub struct HashEmbedder;
46
47impl Embedder for HashEmbedder {
48    fn model_id(&self) -> &str {
49        HASH_MODEL_ID
50    }
51
52    fn dim(&self) -> usize {
53        HASH_EMBEDDING_DIM
54    }
55
56    fn embed_batch(&self, texts: &[String]) -> anyhow::Result<Vec<Vec<f32>>> {
57        Ok(texts.iter().map(|text| hash_embed_text(text, HASH_EMBEDDING_DIM)).collect())
58    }
59}
60
61#[cfg(test)]
62pub struct MockEmbedder {
63    model_id: String,
64    dim: usize,
65}
66
67#[cfg(test)]
68impl MockEmbedder {
69    pub fn new(model_id: impl Into<String>, dim: usize) -> Self {
70        Self { model_id: model_id.into(), dim }
71    }
72}
73
74#[cfg(test)]
75impl Embedder for MockEmbedder {
76    fn model_id(&self) -> &str {
77        &self.model_id
78    }
79
80    fn dim(&self) -> usize {
81        self.dim
82    }
83
84    fn embed_batch(&self, texts: &[String]) -> anyhow::Result<Vec<Vec<f32>>> {
85        Ok(texts.iter().map(|text| hash_embed_text(text, self.dim)).collect())
86    }
87}
88
89#[cfg(feature = "fastembed")]
90pub struct FastEmbedEmbedder {
91    model: std::sync::Mutex<fastembed::TextEmbedding>,
92}
93
94#[cfg(feature = "fastembed")]
95impl FastEmbedEmbedder {
96    pub fn new(intra_threads: Option<usize>) -> anyhow::Result<Self> {
97        use fastembed::{EmbeddingModel, InitOptions, TextEmbedding};
98        let mut options = InitOptions::new(EmbeddingModel::AllMiniLML6V2)
99            .with_cache_dir(fastembed_cache_dir())
100            .with_show_download_progress(true);
101        // `ort_threads` caps the ONNX Runtime intra-op thread pool. Microsoft's prebuilt ORT
102        // binaries (what fastembed downloads) are OpenMP-based, where this has no effect and
103        // OMP_NUM_THREADS (set from `omp_threads`) is the lever instead — see docs/config.md.
104        // We still apply it so non-OpenMP builds honor the configured cap.
105        if let Some(threads) = intra_threads.filter(|threads| *threads > 0) {
106            options = options.with_intra_threads(threads);
107        }
108        Ok(Self { model: std::sync::Mutex::new(TextEmbedding::try_new(options)?) })
109    }
110}
111
112#[cfg(feature = "fastembed")]
113impl Embedder for FastEmbedEmbedder {
114    fn model_id(&self) -> &str {
115        FASTEMBED_MODEL_ID
116    }
117
118    fn dim(&self) -> usize {
119        FASTEMBED_EMBEDDING_DIM
120    }
121
122    fn embed_batch(&self, texts: &[String]) -> anyhow::Result<Vec<Vec<f32>>> {
123        let documents = texts.iter().map(String::as_str).collect::<Vec<_>>();
124        let mut model =
125            self.model.lock().map_err(|_| anyhow::anyhow!("fastembed model lock poisoned"))?;
126        model.embed(documents, None)
127    }
128}
129
130#[cfg(feature = "model2vec")]
131pub struct Model2VecEmbedder {
132    model: model2vec_rs::model::StaticModel,
133}
134
135#[cfg(feature = "model2vec")]
136impl Model2VecEmbedder {
137    pub fn new() -> anyhow::Result<Self> {
138        // Downloads (and caches) the static model from the Hugging Face hub on first use; L2-
139        // normalize so cosine similarity matches the FastEmbed path's expectations.
140        let model = model2vec_rs::model::StaticModel::from_pretrained(
141            MODEL2VEC_HF_REPO,
142            None,
143            Some(true),
144            None,
145        )
146        .map_err(|err| anyhow::anyhow!("failed to load Model2Vec model: {err}"))?;
147        Ok(Self { model })
148    }
149}
150
151#[cfg(feature = "model2vec")]
152impl Embedder for Model2VecEmbedder {
153    fn model_id(&self) -> &str {
154        MODEL2VEC_MODEL_ID
155    }
156
157    fn dim(&self) -> usize {
158        MODEL2VEC_EMBEDDING_DIM
159    }
160
161    fn embed_batch(&self, texts: &[String]) -> anyhow::Result<Vec<Vec<f32>>> {
162        Ok(self.model.encode(texts))
163    }
164}
165
166#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
167pub enum ArtifactStatus {
168    Current,
169    Missing,
170    Stale,
171    Failed,
172    Blocked,
173    Disabled,
174}
175
176impl ArtifactStatus {
177    pub fn as_str(self) -> &'static str {
178        match self {
179            Self::Current => "Current",
180            Self::Missing => "Missing",
181            Self::Stale => "Stale",
182            Self::Failed => "Failed",
183            Self::Blocked => "Blocked",
184            Self::Disabled => "Disabled",
185        }
186    }
187}
188
189#[derive(Debug, Clone, Serialize)]
190pub struct LocalAiStatus {
191    pub embedding: CapabilityStatus,
192    pub artifacts: ArtifactCounts,
193    pub fastembed: FastEmbedOperationalStatus,
194    pub last_reconcile: Option<LastReconcileStatus>,
195}
196
197#[derive(Debug, Clone, Serialize)]
198pub struct CapabilityStatus {
199    pub capability: String,
200    pub model_id: String,
201    pub state: String,
202    pub installed: bool,
203    pub disabled: bool,
204    pub current_artifacts: u64,
205    pub stale_artifacts: u64,
206    pub failed_artifacts: u64,
207    pub blocked_artifacts: u64,
208    pub message: Option<String>,
209}
210
211#[derive(Debug, Clone, Serialize)]
212pub struct FastEmbedOperationalStatus {
213    pub backend: String,
214    pub build_feature_enabled: bool,
215    pub model_id: String,
216    pub model: String,
217    pub dim: usize,
218    pub cache: String,
219    pub installed: bool,
220    pub active: bool,
221    pub status: String,
222    pub current_embeddings: u64,
223    pub eligible_embeddings: u64,
224    pub skipped_embeddings: u64,
225    pub stale_embeddings: u64,
226    pub missing_embeddings: u64,
227    pub failed_embeddings: u64,
228    pub failed_retryable_embeddings: u64,
229    pub failed_waiting_embeddings: u64,
230    pub message: Option<String>,
231    pub next: Option<String>,
232}
233
234#[derive(Debug, Clone, Serialize)]
235pub struct ArtifactCounts {
236    pub total_chunks: u64,
237    pub eligible_chunks: u64,
238    pub skipped_chunks: u64,
239    pub current: u64,
240    pub missing: u64,
241    pub stale: u64,
242    pub failed: u64,
243    pub blocked: u64,
244    pub disabled: u64,
245}
246
247#[derive(Debug, Clone, Serialize)]
248pub struct LastReconcileStatus {
249    pub started_at_ms: i64,
250    pub finished_at_ms: Option<i64>,
251    pub batch_size: u64,
252    pub processed_chunks: u64,
253    pub embeddings_written: u64,
254    pub blocked_chunks: u64,
255    pub elapsed_ms: u64,
256    pub input_chars: u64,
257    pub chunks_per_sec: f64,
258    pub chars_per_sec: f64,
259    pub status: String,
260    pub message: Option<String>,
261}
262
263#[derive(Debug, Clone, Serialize)]
264pub struct ModelInfo {
265    pub model_id: String,
266    pub capability: String,
267    pub embedding_dim: Option<i64>,
268    pub runtime: String,
269    pub installed: bool,
270    pub disabled: bool,
271    pub status: String,
272    pub installed_at_ms: Option<i64>,
273    pub last_error: Option<String>,
274}
275
276#[derive(Debug, Clone, Serialize)]
277pub struct ReconcileReport {
278    pub processed_chunks: u64,
279    pub embeddings_written: u64,
280    pub skipped_chunks: u64,
281    pub failed_chunks: u64,
282    pub blocked_chunks: u64,
283    pub model_id: String,
284    pub model_version: String,
285    pub embedding_dim: usize,
286    pub batch_size: usize,
287    pub max_embedding_chars: usize,
288    pub forced: bool,
289    pub changed_first: bool,
290    pub until_clean: bool,
291    pub max_seconds: Option<u64>,
292    pub work_reasons: BTreeMap<String, u64>,
293    pub skipped_by_policy: BTreeMap<String, u64>,
294    pub input_chars: u64,
295    pub truncated_inputs: u64,
296    pub elapsed_ms: u64,
297    pub chunks_per_sec: f64,
298    pub chars_per_sec: f64,
299    pub avg_chars_per_chunk: f64,
300    pub status: String,
301    pub message: Option<String>,
302}
303
304#[derive(Debug, Clone, Serialize)]
305pub struct ReconcilePlan {
306    pub embeddings: EmbeddingReconcilePlan,
307    pub summaries: SummaryReconcilePlan,
308}
309
310#[derive(Debug, Clone, Serialize)]
311pub struct EmbeddingReconcilePlan {
312    pub model_id: String,
313    pub model_version: String,
314    pub dim: usize,
315    pub available: bool,
316    pub current: u64,
317    pub missing: u64,
318    pub stale: u64,
319    pub model_changed: u64,
320    pub dim_changed: u64,
321    pub failed_retryable: u64,
322    pub failed_waiting: u64,
323    pub blocked: u64,
324    pub disabled: u64,
325    pub skipped_total: u64,
326    pub skipped_by_policy: BTreeMap<String, u64>,
327    pub missing_by_priority: BTreeMap<String, u64>,
328    pub message: Option<String>,
329}
330
331#[derive(Debug, Clone, Serialize)]
332pub struct SummaryReconcilePlan {
333    pub enabled: bool,
334    pub message: String,
335}
336
337#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
338enum ReconcileReason {
339    Missing,
340    SourceChanged,
341    InputChanged,
342    ModelChanged,
343    DimChanged,
344    RetryAfterFailure,
345    Forced,
346}
347
348impl ReconcileReason {
349    fn as_str(self) -> &'static str {
350        match self {
351            Self::Missing => "Missing",
352            Self::SourceChanged => "SourceChanged",
353            Self::InputChanged => "InputChanged",
354            Self::ModelChanged => "ModelChanged",
355            Self::DimChanged => "DimChanged",
356            Self::RetryAfterFailure => "RetryAfterFailure",
357            Self::Forced => "Forced",
358        }
359    }
360}
361
362#[derive(Debug, Clone)]
363pub struct ReconcileOptions {
364    pub limit: Option<u32>,
365    pub batch_size: Option<u32>,
366    pub force: bool,
367    pub until_clean: bool,
368    pub changed_first: bool,
369    pub max_seconds: Option<u64>,
370    pub max_embedding_chars: usize,
371    /// ONNX Runtime intra-op thread cap (`ort_threads`). `None` lets the backend pick (all cores).
372    pub intra_threads: Option<usize>,
373}
374
375impl Default for ReconcileOptions {
376    fn default() -> Self {
377        Self {
378            limit: None,
379            batch_size: None,
380            force: false,
381            until_clean: false,
382            changed_first: false,
383            max_seconds: None,
384            max_embedding_chars: DEFAULT_MAX_EMBEDDING_CHARS,
385            intra_threads: None,
386        }
387    }
388}
389
390#[derive(Debug, Clone, Serialize)]
391pub struct EmbeddingPolicyDecision {
392    pub policy: String,
393    pub priority: i64,
394    pub eligible: bool,
395}
396
397#[derive(Debug, Clone, Serialize)]
398pub enum ReconcileProgress {
399    Started { model_id: String, total_chunks: u64, batch_size: usize },
400    Batch { processed_chunks: u64, total_chunks: u64, embeddings_written: u64, blocked_chunks: u64 },
401    Finished { processed_chunks: u64, embeddings_written: u64, blocked_chunks: u64 },
402}
403
404pub fn ensure_model_manifest(conn: &Connection) -> anyhow::Result<()> {
405    remove_legacy_models(conn)?;
406    upsert_model(conn, HASH_MODEL_ID, "embedding", Some(HASH_EMBEDDING_DIM), "hash", false)?;
407    upsert_model(
408        conn,
409        FASTEMBED_MODEL_ID,
410        "embedding",
411        Some(FASTEMBED_EMBEDDING_DIM),
412        "fastembed",
413        false,
414    )?;
415    upsert_model(
416        conn,
417        MODEL2VEC_MODEL_ID,
418        "embedding",
419        Some(MODEL2VEC_EMBEDDING_DIM),
420        "model2vec",
421        false,
422    )?;
423    normalize_embedding_model_versions(conn)?;
424    Ok(())
425}
426
427fn remove_legacy_models(conn: &Connection) -> anyhow::Result<()> {
428    for model_id in LEGACY_MODEL_IDS {
429        conn.execute("DELETE FROM chunk_embeddings WHERE model_id = ?1", params![model_id])?;
430        conn.execute("DELETE FROM ai_models WHERE model_id = ?1", params![model_id])?;
431        conn.execute(
432            "DELETE FROM index_meta WHERE key = ?1 AND value = ?2",
433            params![ACTIVE_EMBEDDING_MODEL_META, model_id],
434        )?;
435    }
436    Ok(())
437}
438
439fn normalize_embedding_model_versions(conn: &Connection) -> anyhow::Result<()> {
440    conn.execute(
441        "
442        UPDATE chunk_embeddings
443        SET model_version = CASE model_id
444            WHEN 'embedding-hash' THEN 'hash-v1'
445            WHEN 'fastembed-all-minilm-l6-v2' THEN 'fastembed-all-minilm-l6-v2-v1'
446            ELSE model_version
447        END
448        WHERE model_version = 'v1'
449          AND model_id IN ('embedding-hash', 'fastembed-all-minilm-l6-v2')
450        ",
451        [],
452    )?;
453    Ok(())
454}
455
456pub(super) fn recover_cached_fastembed_model(conn: &Connection) -> anyhow::Result<()> {
457    recover_cached_fastembed_model_from(conn, &fastembed_cache_dir())
458}
459
460pub(super) fn recover_cached_fastembed_model_from(
461    conn: &Connection,
462    cache_dir: &Path,
463) -> anyhow::Result<()> {
464    #[cfg(feature = "fastembed")]
465    {
466        recover_cached_fastembed_model_at(conn, cache_dir)?;
467    }
468    #[cfg(not(feature = "fastembed"))]
469    {
470        let _ = (conn, cache_dir);
471    }
472    Ok(())
473}
474
475#[cfg(feature = "fastembed")]
476pub(super) fn recover_cached_fastembed_model_at(
477    conn: &Connection,
478    cache_dir: &Path,
479) -> anyhow::Result<()> {
480    if !fastembed_cache_ready(cache_dir) {
481        return Ok(());
482    }
483    let fastembed = model(conn, FASTEMBED_MODEL_ID)?;
484    if !fastembed.installed || fastembed.status != "Ready" {
485        conn.execute(
486            "UPDATE ai_models
487             SET installed = 1, disabled = 0, status = 'Ready', installed_at_ms = ?2,
488                 embedding_dim = ?3, runtime = 'fastembed', last_error = NULL
489             WHERE model_id = ?1",
490            params![
491                FASTEMBED_MODEL_ID,
492                now_ms(),
493                i64::try_from(FASTEMBED_EMBEDDING_DIM).unwrap_or(i64::MAX)
494            ],
495        )?;
496    }
497    if active_embedding_model_is_missing(conn)? {
498        set_meta(conn, ACTIVE_EMBEDDING_MODEL_META, FASTEMBED_MODEL_ID)?;
499    }
500    Ok(())
501}
502
503#[cfg(feature = "fastembed")]
504fn active_embedding_model_is_missing(conn: &Connection) -> anyhow::Result<bool> {
505    let Some(active_model_id) = meta(conn, ACTIVE_EMBEDDING_MODEL_META)? else {
506        return Ok(true);
507    };
508    let active = conn
509        .query_row(
510            "
511            SELECT model_id, capability, embedding_dim, runtime, installed, disabled, status, installed_at_ms, last_error
512            FROM ai_models WHERE model_id = ?1
513            ",
514            [active_model_id],
515            model_row,
516        )
517        .optional()?;
518    Ok(match active {
519        Some(active) => validate_ready_model(&active).is_err(),
520        None => true,
521    })
522}
523
524#[cfg(feature = "fastembed")]
525fn fastembed_cache_ready(cache_dir: &Path) -> bool {
526    let repo = cache_dir.join(FASTEMBED_HF_CACHE_REPO_DIR);
527    let Ok(revision) = std::fs::read_to_string(repo.join("refs").join("main")) else {
528        return false;
529    };
530    let revision = revision.trim();
531    !revision.is_empty() && repo.join("snapshots").join(revision).is_dir()
532}
533
534pub fn install_model(conn: &Connection, model_id: &str) -> anyhow::Result<ModelInfo> {
535    ensure_model_manifest(conn)?;
536    match model_id {
537        HASH_MODEL_ID => {
538            conn.execute(
539                "UPDATE ai_models
540                 SET installed = 1, disabled = 0, status = 'Ready', installed_at_ms = ?2,
541                     embedding_dim = ?3, runtime = 'hash', last_error = NULL
542                 WHERE model_id = ?1",
543                params![model_id, now_ms(), i64::try_from(HASH_EMBEDDING_DIM).unwrap_or(i64::MAX)],
544            )?;
545            set_meta(conn, ACTIVE_EMBEDDING_MODEL_META, model_id)?;
546        },
547        FASTEMBED_MODEL_ID => {
548            install_fastembed_model(conn, model_id)?;
549            set_meta(conn, ACTIVE_EMBEDDING_MODEL_META, model_id)?;
550        },
551        MODEL2VEC_MODEL_ID => {
552            install_model2vec_model(conn, model_id)?;
553            set_meta(conn, ACTIVE_EMBEDDING_MODEL_META, model_id)?;
554        },
555        other => anyhow::bail!("unknown local AI model `{other}`"),
556    }
557    model(conn, model_id)
558}
559
560pub fn models(conn: &Connection) -> anyhow::Result<Vec<ModelInfo>> {
561    ensure_model_manifest(conn)?;
562    let mut stmt = conn.prepare(
563        "
564        SELECT model_id, capability, embedding_dim, runtime, installed, disabled, status, installed_at_ms, last_error
565        FROM ai_models
566        ORDER BY capability, model_id
567        ",
568    )?;
569    let rows = stmt.query_map([], model_row)?;
570    collect_rows(rows)
571}
572
573pub fn status(conn: &Connection) -> anyhow::Result<LocalAiStatus> {
574    ensure_model_manifest(conn)?;
575    let total_chunks = chunk_count(conn)?;
576    let active_model_id = active_embedding_model_id(conn)?;
577    let embedding = capability_status(conn, "embedding", &active_model_id, total_chunks)?;
578    let fastembed = fastembed_operational_status(conn, &active_model_id)?;
579    let current = embedding.current_artifacts;
580    let stale = embedding.stale_artifacts;
581    let failed = embedding.failed_artifacts;
582    let blocked = embedding.blocked_artifacts;
583    let missing = total_chunks.saturating_sub(current + stale + failed + blocked);
584    let skipped_chunks = fastembed.skipped_embeddings;
585    let eligible_chunks = total_chunks.saturating_sub(skipped_chunks);
586    Ok(LocalAiStatus {
587        embedding,
588        artifacts: ArtifactCounts {
589            total_chunks,
590            eligible_chunks,
591            skipped_chunks,
592            current,
593            missing,
594            stale,
595            failed,
596            blocked,
597            disabled: 0,
598        },
599        fastembed,
600        last_reconcile: last_reconcile_status(conn)?,
601    })
602}
603
604pub fn reconcile_plan(conn: &Connection) -> anyhow::Result<ReconcilePlan> {
605    ensure_model_manifest(conn)?;
606    let model_id = active_embedding_model_id(conn)?;
607    let model = model(conn, &model_id)?;
608    let model_version = active_embedding_model_version(conn, &model_id)?;
609    let dim = usize::try_from(model.embedding_dim.unwrap_or_default()).unwrap_or(0);
610    let available = validate_ready_model(&model).is_ok();
611    let message = (!available).then(|| model_not_ready_reason(&model));
612    Ok(ReconcilePlan {
613        embeddings: embedding_reconcile_plan(
614            conn,
615            &model,
616            &model_version,
617            dim,
618            available,
619            message,
620        )?,
621        summaries: SummaryReconcilePlan {
622            enabled: false,
623            message: "summaries are not implemented yet".to_string(),
624        },
625    })
626}
627
628fn embedding_reconcile_plan(
629    conn: &Connection,
630    model: &ModelInfo,
631    model_version: &str,
632    dim: usize,
633    available: bool,
634    message: Option<String>,
635) -> anyhow::Result<EmbeddingReconcilePlan> {
636    let jobs = embedding_job_candidates(conn, &model.model_id, model_version, dim, None, false)?;
637    let skipped_by_policy = embedding_policy_skip_summary(conn, DEFAULT_MAX_EMBEDDING_CHARS)?;
638    let mut missing_by_priority = BTreeMap::new();
639    let mut current = 0_u64;
640    let mut missing = 0_u64;
641    let mut stale = 0_u64;
642    let mut model_changed = 0_u64;
643    let mut dim_changed = 0_u64;
644    let mut failed_retryable = 0_u64;
645    let mut failed_waiting = 0_u64;
646    let mut blocked = 0_u64;
647    for job in jobs {
648        let policy = policy_for_job(&job, DEFAULT_MAX_EMBEDDING_CHARS);
649        if !policy.eligible {
650            continue;
651        }
652        let current_artifact = job.embedding_status.as_deref() == Some("Current")
653            && job.source_text_hash.as_deref() == Some(job.text_hash.as_str())
654            && job.model_version.as_deref() == Some(model_version)
655            && job.embedding_dim == Some(i64::try_from(dim).unwrap_or(i64::MAX))
656            && job.embedding_text_version.as_deref() == Some(EMBEDDING_TEXT_VERSION)
657            && job.input_hash.as_deref().is_some_and(|input_hash| {
658                let input = build_embedding_input(&job, DEFAULT_MAX_EMBEDDING_CHARS);
659                input_hash == embedding_input_hash(&model.model_id, model_version, &input.text)
660            });
661        if current_artifact {
662            current += 1;
663            continue;
664        }
665        let reason = job.reason(model_version, dim, now_ms(), DEFAULT_MAX_EMBEDDING_CHARS);
666        match reason {
667            ReconcileReason::Missing => missing += 1,
668            ReconcileReason::SourceChanged => stale += 1,
669            ReconcileReason::InputChanged => stale += 1,
670            ReconcileReason::ModelChanged => model_changed += 1,
671            ReconcileReason::DimChanged => dim_changed += 1,
672            ReconcileReason::RetryAfterFailure => failed_retryable += 1,
673            ReconcileReason::Forced => missing += 1,
674        }
675        *missing_by_priority.entry(priority_label(policy.priority).to_string()).or_default() += 1;
676        if job.embedding_status.as_deref() == Some("Failed")
677            && job.next_retry_after_ms.unwrap_or(0) > now_ms()
678        {
679            failed_waiting += 1;
680        }
681        if job.embedding_status.as_deref() == Some("Blocked") {
682            blocked += 1;
683        }
684    }
685    Ok(EmbeddingReconcilePlan {
686        model_id: model.model_id.clone(),
687        model_version: model_version.to_string(),
688        dim,
689        available,
690        current,
691        missing,
692        stale,
693        model_changed,
694        dim_changed,
695        failed_retryable,
696        failed_waiting,
697        blocked,
698        disabled: u64::from(model.disabled),
699        skipped_total: skipped_by_policy.values().sum(),
700        skipped_by_policy,
701        missing_by_priority,
702        message,
703    })
704}
705
706pub fn reconcile(
707    conn: &Connection,
708    limit: Option<u32>,
709    batch_size: Option<u32>,
710) -> anyhow::Result<ReconcileReport> {
711    reconcile_with_options_progress(
712        conn,
713        ReconcileOptions { limit, batch_size, ..ReconcileOptions::default() },
714        |_| {},
715    )
716}
717
718pub fn reconcile_with_progress(
719    conn: &Connection,
720    limit: Option<u32>,
721    batch_size: Option<u32>,
722    force: bool,
723    progress: impl FnMut(ReconcileProgress),
724) -> anyhow::Result<ReconcileReport> {
725    reconcile_with_options_progress(
726        conn,
727        ReconcileOptions { limit, batch_size, force, ..ReconcileOptions::default() },
728        progress,
729    )
730}
731
732pub fn reconcile_with_options_progress(
733    conn: &Connection,
734    options: ReconcileOptions,
735    mut progress: impl FnMut(ReconcileProgress),
736) -> anyhow::Result<ReconcileReport> {
737    ensure_model_manifest(conn)?;
738    let active_model_id = active_embedding_model_id(conn)?;
739    let model = model(conn, &active_model_id)?;
740    let model_version = active_embedding_model_version(conn, &active_model_id)?;
741    let embedding_dim = usize::try_from(model.embedding_dim.unwrap_or_default()).unwrap_or(0);
742    let batch_size = options
743        .batch_size
744        .map(usize::try_from)
745        .transpose()?
746        .filter(|value| *value > 0)
747        .unwrap_or(DEFAULT_BATCH_SIZE);
748    let max_embedding_chars = options.max_embedding_chars.max(MIN_EMBEDDING_CHARS);
749    let started = now_ms();
750    set_reconcile_meta(conn, LAST_EMBEDDING_RECONCILE_STARTED_META, &started.to_string())?;
751    conn.execute(
752        "INSERT INTO reconcile_attempts(started_at_ms, limit_count, status, batch_size) VALUES (?1, ?2, 'Running', ?3)",
753        params![
754            started,
755            options.limit.map(i64::from),
756            i64::try_from(batch_size).unwrap_or(i64::MAX)
757        ],
758    )?;
759    let attempt_id = conn.last_insert_rowid();
760    let timer = Instant::now();
761
762    let embedder = active_embedder(conn, options.intra_threads);
763    let skipped_by_policy = embedding_policy_skip_summary(conn, max_embedding_chars)?;
764    let skipped_chunks = skipped_by_policy.values().sum();
765    let mut report = ReconcileReport {
766        processed_chunks: 0,
767        embeddings_written: 0,
768        skipped_chunks,
769        failed_chunks: 0,
770        blocked_chunks: 0,
771        model_id: active_model_id.clone(),
772        model_version: model_version.clone(),
773        embedding_dim,
774        batch_size,
775        max_embedding_chars,
776        forced: options.force,
777        changed_first: options.changed_first,
778        until_clean: options.until_clean,
779        max_seconds: options.max_seconds,
780        work_reasons: BTreeMap::new(),
781        skipped_by_policy,
782        input_chars: 0,
783        truncated_inputs: 0,
784        elapsed_ms: 0,
785        chunks_per_sec: 0.0,
786        chars_per_sec: 0.0,
787        avg_chars_per_chunk: 0.0,
788        status: "Current".to_string(),
789        message: None,
790    };
791
792    let embedder = match embedder {
793        Ok(embedder) => embedder,
794        Err(_) => {
795            report.status = "Blocked".to_string();
796            report.message = Some(format!(
797                "{} model is not ready; run `rag-rat models install {}`",
798                active_model_id, active_model_id
799            ));
800            finish_reconcile_attempt(conn, attempt_id, &report)?;
801            progress(ReconcileProgress::Started {
802                model_id: active_model_id,
803                total_chunks: 0,
804                batch_size,
805            });
806            progress(ReconcileProgress::Finished {
807                processed_chunks: 0,
808                embeddings_written: 0,
809                blocked_chunks: 0,
810            });
811            return Ok(report);
812        },
813    };
814
815    let scan = EmbeddingScan {
816        model_id: &active_model_id,
817        model_version: &model_version,
818        dim: embedding_dim,
819        max_embedding_chars,
820    };
821    let mut progress_total_chunks = estimated_reconcile_jobs(conn, &scan, &options)?;
822    progress(ReconcileProgress::Started {
823        model_id: active_model_id.clone(),
824        total_chunks: progress_total_chunks,
825        batch_size,
826    });
827
828    // Ordered candidate ids fetched ONCE (ids only, need-first). The loop walks them with a cursor
829    // and loads text per batch, so each chunk's text is read at most once — see
830    // `embedding_candidate_ids`. The processed set guards against a chunk being revisited (e.g.
831    // under --force, whose ordering does not reflect embedding state).
832    let candidate_ids = embedding_candidate_ids(
833        conn,
834        if options.force { "" } else { scan.model_id },
835        options.changed_first,
836    )?;
837    let mut cursor = 0usize;
838    let mut processed_ids: HashSet<i64> = HashSet::new();
839    let mut remaining = options.limit.map(u64::from);
840    loop {
841        if remaining == Some(0) {
842            break;
843        }
844        if options.max_seconds.is_some_and(|seconds| timer.elapsed().as_secs() >= seconds) {
845            report.status = "Partial".to_string();
846            report.message = Some(format!(
847                "max_seconds={} reached; rerun reconcile to continue",
848                options.max_seconds.unwrap_or_default()
849            ));
850            break;
851        }
852        let batch_limit = remaining
853            .map(|value| value.min(u64::try_from(batch_size).unwrap_or(u64::MAX)))
854            .and_then(|value| usize::try_from(value).ok())
855            .unwrap_or(batch_size);
856        // Pull the next batch of unprocessed candidate ids.
857        let mut batch_ids = Vec::with_capacity(batch_limit);
858        while cursor < candidate_ids.len() && batch_ids.len() < batch_limit {
859            let id = candidate_ids[cursor];
860            cursor += 1;
861            if !processed_ids.contains(&id) {
862                batch_ids.push(id);
863            }
864        }
865        if batch_ids.is_empty() {
866            break; // candidate list exhausted
867        }
868        let selected = select_reconcile_batch(conn, &scan, &batch_ids, &options)?;
869        if selected.jobs.is_empty() {
870            // Every id in this batch was filtered (ineligible/already current); keep walking the
871            // rest of the candidate list rather than stopping.
872            continue;
873        }
874        for job in &selected.jobs {
875            processed_ids.insert(job.id);
876            *report.work_reasons.entry(job.reason.as_str().to_string()).or_default() += 1;
877            report.input_chars = report
878                .input_chars
879                .saturating_add(u64::try_from(job.input_chars).unwrap_or(u64::MAX));
880            if job.input_truncated {
881                report.truncated_inputs += 1;
882            }
883        }
884        let jobs_len = selected.jobs.len();
885        let mut reused_jobs = Vec::new();
886        let mut to_embed_jobs = Vec::new();
887        for job in selected.jobs {
888            match find_existing_embedding(conn, &active_model_id, &job.input_hash, embedding_dim)? {
889                Some(vector) => reused_jobs.push((job, vector)),
890                None => to_embed_jobs.push(job),
891            }
892        }
893
894        if !reused_jobs.is_empty() {
895            let (reused_jobs_slice, reused_vectors_slice): (Vec<_>, Vec<_>) =
896                reused_jobs.into_iter().unzip();
897            write_current_embedding_batch(
898                conn,
899                embedder.as_ref(),
900                &model_version,
901                &reused_jobs_slice,
902                &reused_vectors_slice,
903            )?;
904            report.embeddings_written += u64::try_from(reused_jobs_slice.len()).unwrap_or(u64::MAX);
905        }
906
907        if !to_embed_jobs.is_empty() {
908            let texts =
909                to_embed_jobs.iter().map(|chunk| chunk.input_text.clone()).collect::<Vec<_>>();
910            match embedder.embed_batch(&texts) {
911                Ok(vectors) if vectors.len() == to_embed_jobs.len() => {
912                    write_current_embedding_batch(
913                        conn,
914                        embedder.as_ref(),
915                        &model_version,
916                        &to_embed_jobs,
917                        &vectors,
918                    )?;
919                    report.embeddings_written +=
920                        u64::try_from(to_embed_jobs.len()).unwrap_or(u64::MAX);
921                },
922                Ok(vectors) => {
923                    let error = format!(
924                        "embedder {} returned {} vectors for {} texts",
925                        embedder.model_id(),
926                        vectors.len(),
927                        to_embed_jobs.len()
928                    );
929                    write_failed_embedding_batch(
930                        conn,
931                        embedder.as_ref(),
932                        &model_version,
933                        &to_embed_jobs,
934                        &error,
935                    )?;
936                    report.failed_chunks += u64::try_from(to_embed_jobs.len()).unwrap_or(u64::MAX);
937                },
938                Err(err) => {
939                    write_failed_embedding_batch(
940                        conn,
941                        embedder.as_ref(),
942                        &model_version,
943                        &to_embed_jobs,
944                        &err.to_string(),
945                    )?;
946                    report.failed_chunks += u64::try_from(to_embed_jobs.len()).unwrap_or(u64::MAX);
947                },
948            }
949        }
950        report.processed_chunks = report
951            .embeddings_written
952            .saturating_add(report.failed_chunks)
953            .saturating_add(report.blocked_chunks);
954        if let Some(value) = remaining.as_mut() {
955            *value = value.saturating_sub(u64::try_from(jobs_len).unwrap_or(0));
956        }
957        progress_total_chunks = progress_total_chunks.max(report.processed_chunks);
958        progress(ReconcileProgress::Batch {
959            processed_chunks: report.embeddings_written
960                + report.failed_chunks
961                + report.blocked_chunks,
962            total_chunks: progress_total_chunks,
963            embeddings_written: report.embeddings_written,
964            blocked_chunks: report.blocked_chunks,
965        });
966    }
967    if report.failed_chunks > 0 {
968        report.status = "Failed".to_string();
969        report.message =
970            Some(format!("{} chunks failed; retry after backoff", report.failed_chunks));
971    }
972    finalize_reconcile_throughput(&mut report, timer.elapsed().as_millis());
973
974    finish_reconcile_attempt(conn, attempt_id, &report)?;
975    progress(ReconcileProgress::Finished {
976        processed_chunks: report.processed_chunks,
977        embeddings_written: report.embeddings_written,
978        blocked_chunks: report.blocked_chunks,
979    });
980    Ok(report)
981}
982
983fn embedding_policy_skip_summary(
984    conn: &Connection,
985    max_embedding_chars: usize,
986) -> anyhow::Result<BTreeMap<String, u64>> {
987    let mut skipped_by_policy = BTreeMap::new();
988    for chunk in current_chunks(conn, None)? {
989        let policy = policy_for_job(&chunk, max_embedding_chars);
990        if !policy.eligible {
991            *skipped_by_policy.entry(policy.policy).or_default() += 1;
992        }
993    }
994    Ok(skipped_by_policy)
995}
996
997fn finish_reconcile_attempt(
998    conn: &Connection,
999    attempt_id: i64,
1000    report: &ReconcileReport,
1001) -> anyhow::Result<()> {
1002    let finished = now_ms();
1003    conn.execute(
1004        "
1005        UPDATE reconcile_attempts
1006        SET finished_at_ms = ?2,
1007            processed_chunks = ?3,
1008            embeddings_written = ?4,
1009            blocked_chunks = ?5,
1010            status = ?6,
1011            message = ?7,
1012            elapsed_ms = ?8,
1013            input_chars = ?9,
1014            batch_size = ?10
1015        WHERE id = ?1
1016        ",
1017        params![
1018            attempt_id,
1019            finished,
1020            i64::try_from(report.processed_chunks).unwrap_or(i64::MAX),
1021            i64::try_from(report.embeddings_written).unwrap_or(i64::MAX),
1022            i64::try_from(report.blocked_chunks).unwrap_or(i64::MAX),
1023            report.status,
1024            report.message,
1025            i64::try_from(report.elapsed_ms).unwrap_or(i64::MAX),
1026            i64::try_from(report.input_chars).unwrap_or(i64::MAX),
1027            i64::try_from(report.batch_size).unwrap_or(i64::MAX),
1028        ],
1029    )?;
1030    set_reconcile_meta(conn, LAST_EMBEDDING_RECONCILE_FINISHED_META, &finished.to_string())?;
1031    Ok(())
1032}
1033
1034fn last_reconcile_status(conn: &Connection) -> anyhow::Result<Option<LastReconcileStatus>> {
1035    conn.query_row(
1036        "
1037        SELECT started_at_ms,
1038               finished_at_ms,
1039               batch_size,
1040               processed_chunks,
1041               embeddings_written,
1042               blocked_chunks,
1043               elapsed_ms,
1044               input_chars,
1045               status,
1046               message
1047        FROM reconcile_attempts
1048        ORDER BY started_at_ms DESC, id DESC
1049        LIMIT 1
1050        ",
1051        [],
1052        |row| {
1053            let elapsed_ms = u64::try_from(row.get::<_, i64>(6)?).unwrap_or(0);
1054            let input_chars = u64::try_from(row.get::<_, i64>(7)?).unwrap_or(0);
1055            let embeddings_written = u64::try_from(row.get::<_, i64>(4)?).unwrap_or(0);
1056            let elapsed_secs = (elapsed_ms as f64 / 1000.0).max(0.001);
1057            Ok(LastReconcileStatus {
1058                started_at_ms: row.get(0)?,
1059                finished_at_ms: row.get(1)?,
1060                batch_size: u64::try_from(row.get::<_, i64>(2)?).unwrap_or(0),
1061                processed_chunks: u64::try_from(row.get::<_, i64>(3)?).unwrap_or(0),
1062                embeddings_written,
1063                blocked_chunks: u64::try_from(row.get::<_, i64>(5)?).unwrap_or(0),
1064                elapsed_ms,
1065                input_chars,
1066                chunks_per_sec: embeddings_written as f64 / elapsed_secs,
1067                chars_per_sec: input_chars as f64 / elapsed_secs,
1068                status: row.get(8)?,
1069                message: row.get(9)?,
1070            })
1071        },
1072    )
1073    .optional()
1074    .map_err(Into::into)
1075}
1076
1077fn finalize_reconcile_throughput(report: &mut ReconcileReport, elapsed_ms: u128) {
1078    report.elapsed_ms = u64::try_from(elapsed_ms).unwrap_or(u64::MAX);
1079    let elapsed_secs = (report.elapsed_ms as f64 / 1000.0).max(0.001);
1080    report.chunks_per_sec = report.embeddings_written as f64 / elapsed_secs;
1081    report.chars_per_sec = report.input_chars as f64 / elapsed_secs;
1082    report.avg_chars_per_chunk = if report.embeddings_written > 0 {
1083        report.input_chars as f64 / report.embeddings_written as f64
1084    } else {
1085        0.0
1086    };
1087}
1088
1089fn upsert_model(
1090    conn: &Connection,
1091    model_id: &str,
1092    capability: &str,
1093    embedding_dim: Option<usize>,
1094    runtime: &str,
1095    installed_by_default: bool,
1096) -> anyhow::Result<()> {
1097    conn.execute(
1098        "
1099        INSERT INTO ai_models(model_id, capability, embedding_dim, runtime, installed, disabled, status, installed_at_ms)
1100        VALUES (?1, ?2, ?3, ?4, ?5, 0, ?6, ?7)
1101        ON CONFLICT(model_id) DO NOTHING
1102        ",
1103        params![
1104            model_id,
1105            capability,
1106            embedding_dim.map(|dim| i64::try_from(dim).unwrap_or(i64::MAX)),
1107            runtime,
1108            installed_by_default,
1109            if installed_by_default { "Ready" } else { "MissingModel" },
1110            installed_by_default.then(now_ms),
1111        ],
1112    )?;
1113    Ok(())
1114}
1115
1116fn install_model2vec_model(conn: &Connection, model_id: &str) -> anyhow::Result<()> {
1117    #[cfg(feature = "model2vec")]
1118    {
1119        let embedder = Model2VecEmbedder::new()?;
1120        conn.execute(
1121            "UPDATE ai_models
1122             SET installed = 1, disabled = 0, status = 'Ready', installed_at_ms = ?2,
1123                 embedding_dim = ?3, runtime = 'model2vec', last_error = NULL
1124             WHERE model_id = ?1",
1125            params![model_id, now_ms(), i64::try_from(embedder.dim()).unwrap_or(i64::MAX)],
1126        )?;
1127        Ok(())
1128    }
1129    #[cfg(not(feature = "model2vec"))]
1130    {
1131        conn.execute(
1132            "UPDATE ai_models
1133             SET installed = 0, disabled = 0, status = 'MissingRuntime', last_error = ?2
1134             WHERE model_id = ?1",
1135            params![model_id, MODEL2VEC_MISSING_FEATURE_MESSAGE],
1136        )?;
1137        anyhow::bail!("{}", MODEL2VEC_MISSING_FEATURE_MESSAGE)
1138    }
1139}
1140
1141fn install_fastembed_model(conn: &Connection, model_id: &str) -> anyhow::Result<()> {
1142    #[cfg(feature = "fastembed")]
1143    {
1144        let embedder = FastEmbedEmbedder::new(None)
1145            .map_err(|err| anyhow::anyhow!("failed to initialize fastembed model: {err}"))?;
1146        conn.execute(
1147            "UPDATE ai_models
1148             SET installed = 1, disabled = 0, status = 'Ready', installed_at_ms = ?2,
1149                 embedding_dim = ?3, runtime = 'fastembed', last_error = NULL
1150             WHERE model_id = ?1",
1151            params![model_id, now_ms(), i64::try_from(embedder.dim()).unwrap_or(i64::MAX)],
1152        )?;
1153        Ok(())
1154    }
1155    #[cfg(not(feature = "fastembed"))]
1156    {
1157        conn.execute(
1158            "UPDATE ai_models
1159             SET installed = 0, disabled = 0, status = 'MissingRuntime', last_error = ?2
1160             WHERE model_id = ?1",
1161            params![model_id, FASTEMBED_MISSING_FEATURE_MESSAGE],
1162        )?;
1163        anyhow::bail!("{}", FASTEMBED_MISSING_FEATURE_MESSAGE)
1164    }
1165}
1166
1167fn fastembed_operational_status(
1168    conn: &Connection,
1169    active_model_id: &str,
1170) -> anyhow::Result<FastEmbedOperationalStatus> {
1171    let model = model(conn, FASTEMBED_MODEL_ID)?;
1172    let model_version = active_embedding_model_version(conn, FASTEMBED_MODEL_ID)?;
1173    let plan = embedding_reconcile_plan(
1174        conn,
1175        &model,
1176        &model_version,
1177        FASTEMBED_EMBEDDING_DIM,
1178        validate_ready_model(&model).is_ok(),
1179        model.last_error.clone(),
1180    )?;
1181    let failed = plan.failed_retryable.saturating_add(plan.failed_waiting);
1182    Ok(FastEmbedOperationalStatus {
1183        backend: "fastembed".to_string(),
1184        build_feature_enabled: fastembed_build_feature_enabled(),
1185        model_id: FASTEMBED_MODEL_ID.to_string(),
1186        model: FASTEMBED_DISPLAY_MODEL.to_string(),
1187        dim: FASTEMBED_EMBEDDING_DIM,
1188        cache: fastembed_cache_dir().display().to_string(),
1189        installed: model.installed,
1190        active: active_model_id == FASTEMBED_MODEL_ID,
1191        status: model.status,
1192        current_embeddings: plan.current,
1193        eligible_embeddings: plan
1194            .current
1195            .saturating_add(plan.missing)
1196            .saturating_add(plan.stale)
1197            .saturating_add(plan.model_changed)
1198            .saturating_add(plan.dim_changed)
1199            .saturating_add(plan.failed_retryable)
1200            .saturating_add(plan.failed_waiting)
1201            .saturating_add(plan.blocked),
1202        skipped_embeddings: plan.skipped_total,
1203        stale_embeddings: plan
1204            .stale
1205            .saturating_add(plan.model_changed)
1206            .saturating_add(plan.dim_changed),
1207        missing_embeddings: plan.missing,
1208        failed_embeddings: failed,
1209        failed_retryable_embeddings: plan.failed_retryable,
1210        failed_waiting_embeddings: plan.failed_waiting,
1211        message: model.last_error,
1212        next: fastembed_next_command(&plan),
1213    })
1214}
1215
1216fn fastembed_next_command(plan: &EmbeddingReconcilePlan) -> Option<String> {
1217    if !fastembed_build_feature_enabled() {
1218        return Some("cargo install rag-rat".to_string());
1219    }
1220    if !plan.available {
1221        return Some(format!("rag-rat models install {}", FASTEMBED_MODEL_ID));
1222    }
1223    if plan.missing > 0
1224        || plan.stale > 0
1225        || plan.model_changed > 0
1226        || plan.dim_changed > 0
1227        || plan.failed_retryable > 0
1228    {
1229        return Some("rag-rat reconcile --limit 500".to_string());
1230    }
1231    if plan.failed_waiting > 0 {
1232        return Some("rag-rat reconcile --plan".to_string());
1233    }
1234    None
1235}
1236
1237fn fastembed_build_feature_enabled() -> bool {
1238    cfg!(feature = "fastembed")
1239}
1240
1241fn capability_status(
1242    conn: &Connection,
1243    capability: &str,
1244    model_id: &str,
1245    total_chunks: u64,
1246) -> anyhow::Result<CapabilityStatus> {
1247    let model = model(conn, model_id)?;
1248    let current = current_artifact_count(conn, capability, model_id)?;
1249    let stale = stale_artifact_count(conn, capability, model_id)?;
1250    let failed = status_artifact_count(conn, capability, model_id, ArtifactStatus::Failed)?;
1251    let blocked = status_artifact_count(conn, capability, model_id, ArtifactStatus::Blocked)?;
1252    let state = if model.disabled {
1253        "Disabled"
1254    } else if total_chunks == 0 {
1255        "IndexEmpty"
1256    } else if !model.installed {
1257        "MissingModel"
1258    } else if failed > 0 {
1259        "Failed"
1260    } else {
1261        "Ready"
1262    };
1263    Ok(CapabilityStatus {
1264        capability: capability.to_string(),
1265        model_id: model_id.to_string(),
1266        state: state.to_string(),
1267        installed: model.installed,
1268        disabled: model.disabled,
1269        current_artifacts: current,
1270        stale_artifacts: stale,
1271        failed_artifacts: failed,
1272        blocked_artifacts: blocked,
1273        message: model.last_error,
1274    })
1275}
1276
1277fn model(conn: &Connection, model_id: &str) -> anyhow::Result<ModelInfo> {
1278    Ok(conn.query_row(
1279        "
1280        SELECT model_id, capability, embedding_dim, runtime, installed, disabled, status, installed_at_ms, last_error
1281        FROM ai_models WHERE model_id = ?1
1282        ",
1283        [model_id],
1284        model_row,
1285    )?)
1286}
1287
1288fn model_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<ModelInfo> {
1289    Ok(ModelInfo {
1290        model_id: row.get(0)?,
1291        capability: row.get(1)?,
1292        embedding_dim: row.get(2)?,
1293        runtime: row.get(3)?,
1294        installed: row.get::<_, bool>(4)?,
1295        disabled: row.get::<_, bool>(5)?,
1296        status: row.get(6)?,
1297        installed_at_ms: row.get(7)?,
1298        last_error: row.get(8)?,
1299    })
1300}
1301
1302#[derive(Debug, Clone)]
1303struct CurrentChunk {
1304    id: i64,
1305    path: String,
1306    language: String,
1307    file_kind: String,
1308    chunk_kind: String,
1309    symbol_path: Option<String>,
1310    text: String,
1311    text_hash: String,
1312    embedding_status: Option<String>,
1313    source_text_hash: Option<String>,
1314    model_version: Option<String>,
1315    embedding_dim: Option<i64>,
1316    input_hash: Option<String>,
1317    embedding_text_version: Option<String>,
1318    next_retry_after_ms: Option<i64>,
1319    reason: ReconcileReason,
1320}
1321
1322#[derive(Debug)]
1323struct PreparedEmbeddingJob {
1324    id: i64,
1325    text_hash: String,
1326    input_text: String,
1327    input_hash: String,
1328    input_chars: usize,
1329    input_truncated: bool,
1330    policy: String,
1331    priority: i64,
1332    reason: ReconcileReason,
1333}
1334
1335struct SelectedBatch {
1336    jobs: Vec<PreparedEmbeddingJob>,
1337}
1338
1339fn current_chunks(conn: &Connection, limit: Option<u32>) -> anyhow::Result<Vec<CurrentChunk>> {
1340    embedding_job_candidates(conn, "", "", 0, limit, false)
1341}
1342
1343fn current_chunk_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<CurrentChunk> {
1344    Ok(CurrentChunk {
1345        id: row.get(0)?,
1346        path: row.get(1)?,
1347        language: row.get(2)?,
1348        file_kind: row.get(3)?,
1349        chunk_kind: row.get(4)?,
1350        symbol_path: row.get(5)?,
1351        text: row.get(6)?,
1352        text_hash: row.get(7)?,
1353        embedding_status: row.get(8)?,
1354        source_text_hash: row.get(9)?,
1355        model_version: row.get(10)?,
1356        embedding_dim: row.get(11)?,
1357        input_hash: row.get(12)?,
1358        embedding_text_version: row.get(13)?,
1359        next_retry_after_ms: row.get(14)?,
1360        reason: ReconcileReason::Forced,
1361    })
1362}
1363
1364fn embedding_job_candidates(
1365    conn: &Connection,
1366    model_id: &str,
1367    model_version: &str,
1368    dim: usize,
1369    limit: Option<u32>,
1370    changed_first: bool,
1371) -> anyhow::Result<Vec<CurrentChunk>> {
1372    let changed_order = if changed_first {
1373        "chunks.source_revision DESC,"
1374    } else {
1375        "chunks.embedding_priority ASC,"
1376    };
1377    let sql = format!(
1378        "
1379        SELECT chunks.id,
1380               files.path,
1381               files.language,
1382               files.kind,
1383               chunks.chunk_kind,
1384               chunks.symbol_path,
1385               chunks.text,
1386               chunks.text_hash,
1387               chunk_embeddings.status,
1388               chunk_embeddings.source_text_hash,
1389               chunk_embeddings.model_version,
1390               chunk_embeddings.embedding_dim,
1391               chunk_embeddings.input_hash,
1392               chunk_embeddings.embedding_text_version,
1393               chunk_embeddings.next_retry_after_ms
1394        FROM chunks
1395        JOIN files ON files.id = chunks.file_id
1396        LEFT JOIN chunk_embeddings
1397          ON chunk_embeddings.chunk_id = chunks.id
1398         AND chunk_embeddings.model_id = ?1
1399        WHERE chunks.text IS NOT NULL
1400        ORDER BY
1401          CASE
1402            WHEN chunk_embeddings.chunk_id IS NULL THEN 0
1403            WHEN chunk_embeddings.source_text_hash != chunks.text_hash THEN 1
1404            WHEN chunk_embeddings.status = 'Failed' THEN 2
1405            ELSE 3
1406          END,
1407          {changed_order}
1408          chunks.id
1409        LIMIT ?5
1410    "
1411    );
1412    let mut stmt = conn.prepare(&sql)?;
1413    let rows = stmt.query_map(
1414        params![
1415            model_id,
1416            model_version,
1417            i64::try_from(dim).unwrap_or(i64::MAX),
1418            now_ms(),
1419            limit.map(i64::from).unwrap_or(i64::MAX)
1420        ],
1421        current_chunk_row,
1422    )?;
1423    let mut chunks = collect_rows(rows)?;
1424    if !model_id.is_empty() {
1425        for chunk in &mut chunks {
1426            chunk.reason = ReconcileReason::Missing;
1427        }
1428    }
1429    Ok(chunks)
1430}
1431
1432/// Ordered candidate chunk ids for one reconcile run, fetched ONCE (ids only, no text), need-first
1433/// exactly like `embedding_job_candidates`. The loop walks this list in batches and loads text per
1434/// batch via [`current_chunks_by_ids`], so each chunk's text is read at most once per run. The old
1435/// path re-queried *every* candidate's text on *every* batch — O(n²) SQLite work that dominated
1436/// reconcile on large repos (it looked like model time but was query/row-materialization CPU).
1437fn embedding_candidate_ids(
1438    conn: &Connection,
1439    model_id: &str,
1440    changed_first: bool,
1441) -> anyhow::Result<Vec<i64>> {
1442    let changed_order = if changed_first {
1443        "chunks.source_revision DESC,"
1444    } else {
1445        "chunks.embedding_priority ASC,"
1446    };
1447    let sql = format!(
1448        "
1449        SELECT chunks.id
1450        FROM chunks
1451        JOIN files ON files.id = chunks.file_id
1452        LEFT JOIN chunk_embeddings
1453          ON chunk_embeddings.chunk_id = chunks.id
1454         AND chunk_embeddings.model_id = ?1
1455        WHERE chunks.text IS NOT NULL
1456        ORDER BY
1457          CASE
1458            WHEN chunk_embeddings.chunk_id IS NULL THEN 0
1459            WHEN chunk_embeddings.source_text_hash != chunks.text_hash THEN 1
1460            WHEN chunk_embeddings.status = 'Failed' THEN 2
1461            ELSE 3
1462          END,
1463          {changed_order}
1464          chunks.id
1465        "
1466    );
1467    let mut stmt = conn.prepare(&sql)?;
1468    let ids = stmt
1469        .query_map(params![model_id], |row| row.get::<_, i64>(0))?
1470        .collect::<Result<Vec<_>, _>>()?;
1471    Ok(ids)
1472}
1473
1474/// Load full chunk rows (with text + embedding metadata) for a specific set of ids, in the given
1475/// order. Used per batch so only the chunks about to be considered are materialized.
1476fn current_chunks_by_ids(
1477    conn: &Connection,
1478    model_id: &str,
1479    ids: &[i64],
1480) -> anyhow::Result<Vec<CurrentChunk>> {
1481    if ids.is_empty() {
1482        return Ok(Vec::new());
1483    }
1484    let placeholders = vec!["?"; ids.len()].join(",");
1485    let sql = format!(
1486        "
1487        SELECT chunks.id, files.path, files.language, files.kind, chunks.chunk_kind,
1488               chunks.symbol_path, chunks.text, chunks.text_hash,
1489               chunk_embeddings.status, chunk_embeddings.source_text_hash,
1490               chunk_embeddings.model_version, chunk_embeddings.embedding_dim,
1491               chunk_embeddings.input_hash, chunk_embeddings.embedding_text_version,
1492               chunk_embeddings.next_retry_after_ms
1493        FROM chunks
1494        JOIN files ON files.id = chunks.file_id
1495        LEFT JOIN chunk_embeddings
1496          ON chunk_embeddings.chunk_id = chunks.id
1497         AND chunk_embeddings.model_id = ?1
1498        WHERE chunks.id IN ({placeholders})
1499        "
1500    );
1501    let mut bind: Vec<Value> = Vec::with_capacity(ids.len() + 1);
1502    bind.push(Value::Text(model_id.to_string()));
1503    bind.extend(ids.iter().map(|id| Value::Integer(*id)));
1504    let mut stmt = conn.prepare(&sql)?;
1505    let rows = stmt.query_map(params_from_iter(bind), current_chunk_row)?;
1506    let mut by_id: std::collections::HashMap<i64, CurrentChunk> =
1507        collect_rows(rows)?.into_iter().map(|chunk| (chunk.id, chunk)).collect();
1508    // Mirror embedding_job_candidates: a non-empty model_id means this is a real (non-force) run,
1509    // so the default reason is Missing (the precise reason is recomputed in select_reconcile_batch).
1510    if !model_id.is_empty() {
1511        for chunk in by_id.values_mut() {
1512            chunk.reason = ReconcileReason::Missing;
1513        }
1514    }
1515    Ok(ids.iter().filter_map(|id| by_id.remove(id)).collect())
1516}
1517
1518impl CurrentChunk {
1519    fn reason(
1520        &self,
1521        model_version: &str,
1522        dim: usize,
1523        now_ms: i64,
1524        _max_embedding_chars: usize,
1525    ) -> ReconcileReason {
1526        if self.reason == ReconcileReason::Forced {
1527            return ReconcileReason::Forced;
1528        }
1529        if self.embedding_status.is_none() {
1530            return ReconcileReason::Missing;
1531        }
1532        if self.source_text_hash.as_deref() != Some(self.text_hash.as_str()) {
1533            return ReconcileReason::SourceChanged;
1534        }
1535        if self.input_hash.as_deref().is_none_or(str::is_empty) {
1536            return ReconcileReason::InputChanged;
1537        }
1538        if self.model_version.as_deref() != Some(model_version)
1539            || self.embedding_text_version.as_deref() != Some(EMBEDDING_TEXT_VERSION)
1540        {
1541            return ReconcileReason::ModelChanged;
1542        }
1543        if self.embedding_dim != Some(i64::try_from(dim).unwrap_or(i64::MAX)) {
1544            return ReconcileReason::DimChanged;
1545        }
1546        if self.embedding_status.as_deref() == Some("Failed")
1547            && self.next_retry_after_ms.unwrap_or(0) <= now_ms
1548        {
1549            return ReconcileReason::RetryAfterFailure;
1550        }
1551        ReconcileReason::Missing
1552    }
1553}
1554
1555/// The embedding-model identity for one reconcile run. Stable across every batch, so it
1556/// travels as a context struct rather than as repeated positional args (rust-modern-style:
1557/// separate context from per-call command).
1558struct EmbeddingScan<'a> {
1559    model_id: &'a str,
1560    model_version: &'a str,
1561    dim: usize,
1562    max_embedding_chars: usize,
1563}
1564
1565fn estimated_reconcile_jobs(
1566    conn: &Connection,
1567    scan: &EmbeddingScan<'_>,
1568    options: &ReconcileOptions,
1569) -> anyhow::Result<u64> {
1570    let candidates = if options.force {
1571        current_chunks(conn, options.limit)?
1572    } else {
1573        embedding_job_candidates(
1574            conn,
1575            scan.model_id,
1576            scan.model_version,
1577            scan.dim,
1578            options.limit,
1579            options.changed_first,
1580        )?
1581    };
1582    let count = candidates
1583        .iter()
1584        .filter(|candidate| {
1585            policy_for_job(candidate, scan.max_embedding_chars).eligible
1586                && (options.force
1587                    || needs_embedding(
1588                        candidate,
1589                        scan.model_id,
1590                        scan.model_version,
1591                        scan.dim,
1592                        scan.max_embedding_chars,
1593                    ))
1594        })
1595        .count();
1596    Ok(u64::try_from(count).unwrap_or(u64::MAX))
1597}
1598
1599/// Build the embedding jobs for one batch of candidate ids. Text is loaded only for `ids` (the
1600/// current batch), then the per-chunk policy and freshness filters are applied — the single source
1601/// of truth for "does this chunk need embedding" stays in Rust ([`needs_embedding`]).
1602fn select_reconcile_batch(
1603    conn: &Connection,
1604    scan: &EmbeddingScan<'_>,
1605    ids: &[i64],
1606    options: &ReconcileOptions,
1607) -> anyhow::Result<SelectedBatch> {
1608    // Under --force the candidate ordering does not reflect embedding state, so the empty model_id
1609    // (matching no chunk_embeddings) keeps every chunk's reason as Forced.
1610    let model_id = if options.force { "" } else { scan.model_id };
1611    let candidates = current_chunks_by_ids(conn, model_id, ids)?;
1612    let mut jobs = Vec::new();
1613    for candidate in candidates {
1614        let policy = policy_for_job(&candidate, scan.max_embedding_chars);
1615        if !policy.eligible {
1616            continue;
1617        }
1618        if !options.force
1619            && !needs_embedding(
1620                &candidate,
1621                scan.model_id,
1622                scan.model_version,
1623                scan.dim,
1624                scan.max_embedding_chars,
1625            )
1626        {
1627            continue;
1628        }
1629        let input = build_embedding_input(&candidate, scan.max_embedding_chars);
1630        let reason = if options.force {
1631            ReconcileReason::Forced
1632        } else {
1633            candidate.reason(scan.model_version, scan.dim, now_ms(), scan.max_embedding_chars)
1634        };
1635        jobs.push(PreparedEmbeddingJob {
1636            id: candidate.id,
1637            text_hash: candidate.text_hash,
1638            input_hash: embedding_input_hash(scan.model_id, scan.model_version, &input.text),
1639            input_chars: input.chars,
1640            input_truncated: input.truncated,
1641            input_text: input.text,
1642            policy: policy.policy,
1643            priority: policy.priority,
1644            reason,
1645        });
1646    }
1647    Ok(SelectedBatch { jobs })
1648}
1649
1650fn needs_embedding(
1651    chunk: &CurrentChunk,
1652    model_id: &str,
1653    model_version: &str,
1654    dim: usize,
1655    max_embedding_chars: usize,
1656) -> bool {
1657    let input = build_embedding_input(chunk, max_embedding_chars);
1658    let expected_input_hash = embedding_input_hash(model_id, model_version, &input.text);
1659    chunk.embedding_status.as_deref() != Some("Current")
1660        || chunk.source_text_hash.as_deref() != Some(chunk.text_hash.as_str())
1661        || chunk.model_version.as_deref() != Some(model_version)
1662        || chunk.embedding_dim != Some(i64::try_from(dim).unwrap_or(i64::MAX))
1663        || chunk.input_hash.as_deref() != Some(expected_input_hash.as_str())
1664        || chunk.embedding_text_version.as_deref() != Some(EMBEDDING_TEXT_VERSION)
1665}
1666
1667pub fn embedding_policy_for_chunk(
1668    path: &Path,
1669    language: &str,
1670    file_kind: &str,
1671    chunk_kind: &str,
1672    symbol_path: Option<&str>,
1673    text: &str,
1674    max_embedding_chars: usize,
1675) -> EmbeddingPolicyDecision {
1676    let path_text = path.to_string_lossy();
1677    let trimmed = text.trim();
1678    if trimmed.chars().count() > max_embedding_chars.saturating_mul(4)
1679        && (file_kind == "generated" || chunk_kind == "generated" || symbol_path.is_none())
1680    {
1681        return policy("SkipTooLarge", 9, false);
1682    }
1683    if file_kind == "generated" || chunk_kind == "generated" || looks_generated_path(&path_text) {
1684        return policy("SkipGenerated", 9, false);
1685    }
1686    if is_test_fixture_path(&path_text) {
1687        return policy("SkipTestFixture", 9, false);
1688    }
1689    let Ok(language_kind) = language.parse::<Language>() else {
1690        return policy("SkipLanguageUnsupported", 9, false);
1691    };
1692    if !language_kind.supports_embeddings() {
1693        return policy("SkipLanguageUnsupported", 9, false);
1694    }
1695    if trimmed.chars().count() < MIN_EMBEDDING_CHARS {
1696        return policy("SkipTooSmall", 9, false);
1697    }
1698    if is_low_signal_chunk(language, chunk_kind, symbol_path, trimmed) {
1699        return policy("SkipLowSignal", 9, false);
1700    }
1701    policy("Embed", embedding_priority(&path_text, language, chunk_kind, symbol_path), true)
1702}
1703
1704fn policy(name: &str, priority: i64, eligible: bool) -> EmbeddingPolicyDecision {
1705    EmbeddingPolicyDecision { policy: name.to_string(), priority, eligible }
1706}
1707
1708fn policy_for_job(chunk: &CurrentChunk, max_embedding_chars: usize) -> EmbeddingPolicyDecision {
1709    embedding_policy_for_chunk(
1710        Path::new(&chunk.path),
1711        &chunk.language,
1712        &chunk.file_kind,
1713        &chunk.chunk_kind,
1714        chunk.symbol_path.as_deref(),
1715        &chunk.text,
1716        max_embedding_chars,
1717    )
1718}
1719
1720fn embedding_priority(
1721    path: &str,
1722    language: &str,
1723    chunk_kind: &str,
1724    symbol_path: Option<&str>,
1725) -> i64 {
1726    if symbol_path.is_some()
1727        && matches!(chunk_kind, "code")
1728        && !is_test_path(path)
1729        && language != "markdown"
1730    {
1731        return 0;
1732    }
1733    if language == "markdown" {
1734        return 1;
1735    }
1736    if is_test_path(path) {
1737        return 2;
1738    }
1739    1
1740}
1741
1742fn priority_label(priority: i64) -> &'static str {
1743    match priority {
1744        0 => "source_symbols",
1745        1 => "source_or_docs",
1746        2 => "tests",
1747        3 => "low_signal",
1748        9 => "skipped",
1749        _ => "other",
1750    }
1751}
1752
1753fn looks_generated_path(path: &str) -> bool {
1754    path.contains("/generated/")
1755        || path.contains("/src/generated/")
1756        || path.contains("/target/")
1757        || path.ends_with("Cargo.lock")
1758        || path.ends_with("package-lock.json")
1759        || path.ends_with("pnpm-lock.yaml")
1760}
1761
1762fn is_test_path(path: &str) -> bool {
1763    path.contains("/tests/")
1764        || path.contains("/test/")
1765        || path.contains("__tests__")
1766        || path.ends_with("_test.rs")
1767        || path.ends_with(".test.ts")
1768        || path.ends_with(".spec.ts")
1769        || path.ends_with(".test.tsx")
1770        || path.ends_with(".spec.tsx")
1771}
1772
1773fn is_test_fixture_path(path: &str) -> bool {
1774    path.contains("/fixtures/")
1775        || path.contains("/__fixtures__/")
1776        || path.contains("/testdata/")
1777        || path.contains("/snapshots/")
1778        || path.ends_with(".snap")
1779}
1780
1781fn is_low_signal_chunk(
1782    language: &str,
1783    chunk_kind: &str,
1784    symbol_path: Option<&str>,
1785    text: &str,
1786) -> bool {
1787    if language == "markdown" {
1788        return false;
1789    }
1790    let lines = text
1791        .lines()
1792        .map(str::trim)
1793        .filter(|line| !line.is_empty() && !line.starts_with("//") && !line.starts_with("/*"))
1794        .collect::<Vec<_>>();
1795    if lines.is_empty() {
1796        return true;
1797    }
1798    if symbol_path.is_none() && chunk_kind == "code" && lines.len() <= 3 {
1799        return true;
1800    }
1801    lines.iter().all(|line| {
1802        line.starts_with("use ")
1803            || line.starts_with("pub use ")
1804            || line.starts_with("import ")
1805            || line.starts_with("export ")
1806            || line.starts_with("mod ")
1807            || line.starts_with("pub mod ")
1808            || *line == "}"
1809            || *line == "{"
1810    })
1811}
1812
1813struct EmbeddingInput {
1814    text: String,
1815    chars: usize,
1816    truncated: bool,
1817}
1818
1819fn build_embedding_input(chunk: &CurrentChunk, max_chars: usize) -> EmbeddingInput {
1820    let mut input = String::new();
1821    input.push_str("path: ");
1822    input.push_str(&chunk.path);
1823    input.push('\n');
1824    input.push_str("language: ");
1825    input.push_str(&chunk.language);
1826    input.push('\n');
1827    input.push_str("kind: ");
1828    input.push_str(&chunk.chunk_kind);
1829    input.push('\n');
1830    if let Some(symbol_path) = &chunk.symbol_path {
1831        input.push_str("symbol: ");
1832        input.push_str(symbol_path);
1833        input.push('\n');
1834    }
1835    input.push_str("body:\n");
1836    let prefix_chars = input.chars().count();
1837    let budget = max_chars.saturating_sub(prefix_chars).max(MIN_EMBEDDING_CHARS);
1838    let (body, truncated) = truncate_chars(&chunk.text, budget);
1839    input.push_str(&body);
1840    let chars = input.chars().count();
1841    EmbeddingInput { text: input, chars, truncated }
1842}
1843
1844fn truncate_chars(text: &str, max_chars: usize) -> (String, bool) {
1845    if text.chars().count() <= max_chars {
1846        return (text.to_string(), false);
1847    }
1848    (text.chars().take(max_chars).collect(), true)
1849}
1850
1851fn embedding_input_hash(model_id: &str, model_version: &str, input: &str) -> String {
1852    let mut hasher = Sha256::new();
1853    hasher.update(model_id.as_bytes());
1854    hasher.update(b"\0");
1855    hasher.update(model_version.as_bytes());
1856    hasher.update(b"\0");
1857    hasher.update(EMBEDDING_TEXT_VERSION.as_bytes());
1858    hasher.update(b"\0");
1859    hasher.update(input.as_bytes());
1860    let hash = hasher.finalize();
1861    let mut out = String::with_capacity(hash.len() * 2);
1862    for byte in hash {
1863        use std::fmt::Write as _;
1864        let _ = write!(out, "{byte:02x}");
1865    }
1866    out
1867}
1868
1869fn write_current_embedding_batch(
1870    conn: &Connection,
1871    embedder: &dyn Embedder,
1872    model_version: &str,
1873    batch: &[PreparedEmbeddingJob],
1874    vectors: &[Vec<f32>],
1875) -> anyhow::Result<()> {
1876    conn.execute_batch("BEGIN IMMEDIATE")?;
1877    let write_result = (|| {
1878        for (chunk, vector) in batch.iter().zip(vectors) {
1879            store_embedding(conn, embedder, model_version, chunk, vector)?;
1880        }
1881        Ok(())
1882    })();
1883    finish_batch_transaction(conn, write_result)
1884}
1885
1886fn write_failed_embedding_batch(
1887    conn: &Connection,
1888    embedder: &dyn Embedder,
1889    model_version: &str,
1890    batch: &[PreparedEmbeddingJob],
1891    error: &str,
1892) -> anyhow::Result<()> {
1893    conn.execute_batch("BEGIN IMMEDIATE")?;
1894    let write_result = (|| {
1895        for chunk in batch {
1896            store_failed_embedding(conn, embedder, model_version, chunk, error)?;
1897        }
1898        Ok(())
1899    })();
1900    finish_batch_transaction(conn, write_result)
1901}
1902
1903fn finish_batch_transaction(conn: &Connection, result: anyhow::Result<()>) -> anyhow::Result<()> {
1904    match result {
1905        Ok(()) => {
1906            conn.execute_batch("COMMIT")?;
1907            Ok(())
1908        },
1909        Err(err) => {
1910            let _ = conn.execute_batch("ROLLBACK");
1911            Err(err)
1912        },
1913    }
1914}
1915
1916fn store_embedding(
1917    conn: &Connection,
1918    embedder: &dyn Embedder,
1919    model_version: &str,
1920    chunk: &PreparedEmbeddingJob,
1921    vector: &[f32],
1922) -> anyhow::Result<()> {
1923    if vector.len() != embedder.dim() {
1924        anyhow::bail!(
1925            "embedding dimension mismatch for {}: got {}, expected {}",
1926            embedder.model_id(),
1927            vector.len(),
1928            embedder.dim()
1929        );
1930    }
1931    conn.execute(
1932        "
1933        INSERT INTO chunk_embeddings(
1934            chunk_id, model_id, model_version, source_text_hash, input_hash,
1935            embedding_text_version, embedding_policy, embedding_priority, input_chars,
1936            input_truncated, embedding_dim, vector_blob,
1937            status, attempt_count, last_error_class, next_retry_after_ms, computed_at_ms,
1938            created_at_ms, last_error
1939        )
1940        VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, 'Current', 1, NULL, NULL, ?13, ?13, NULL)
1941        ON CONFLICT(chunk_id, model_id) DO UPDATE SET
1942            model_version = excluded.model_version,
1943            source_text_hash = excluded.source_text_hash,
1944            input_hash = excluded.input_hash,
1945            embedding_text_version = excluded.embedding_text_version,
1946            embedding_policy = excluded.embedding_policy,
1947            embedding_priority = excluded.embedding_priority,
1948            input_chars = excluded.input_chars,
1949            input_truncated = excluded.input_truncated,
1950            embedding_dim = excluded.embedding_dim,
1951            vector_blob = excluded.vector_blob,
1952            status = excluded.status,
1953            attempt_count = chunk_embeddings.attempt_count + 1,
1954            last_error_class = NULL,
1955            next_retry_after_ms = NULL,
1956            computed_at_ms = excluded.computed_at_ms,
1957            created_at_ms = excluded.created_at_ms,
1958            last_error = NULL
1959        ",
1960        params![
1961            chunk.id,
1962            embedder.model_id(),
1963            model_version,
1964            chunk.text_hash,
1965            chunk.input_hash,
1966            EMBEDDING_TEXT_VERSION,
1967            chunk.policy,
1968            chunk.priority,
1969            i64::try_from(chunk.input_chars).unwrap_or(i64::MAX),
1970            chunk.input_truncated,
1971            i64::try_from(embedder.dim()).unwrap_or(i64::MAX),
1972            encode_vector(vector),
1973            now_ms()
1974        ],
1975    )?;
1976    Ok(())
1977}
1978
1979fn store_failed_embedding(
1980    conn: &Connection,
1981    embedder: &dyn Embedder,
1982    model_version: &str,
1983    chunk: &PreparedEmbeddingJob,
1984    error: &str,
1985) -> anyhow::Result<()> {
1986    let retry_at = now_ms().saturating_add(60_000);
1987    conn.execute(
1988        "
1989        INSERT INTO chunk_embeddings(
1990            chunk_id, model_id, model_version, source_text_hash, input_hash,
1991            embedding_text_version, embedding_policy, embedding_priority, input_chars,
1992            input_truncated, embedding_dim, vector_blob,
1993            status, attempt_count, last_error_class, next_retry_after_ms, computed_at_ms,
1994            created_at_ms, last_error
1995        )
1996        VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, x'', 'Failed', 1, ?12, ?13, NULL, ?14, ?15)
1997        ON CONFLICT(chunk_id, model_id) DO UPDATE SET
1998            model_version = excluded.model_version,
1999            source_text_hash = excluded.source_text_hash,
2000            input_hash = excluded.input_hash,
2001            embedding_text_version = excluded.embedding_text_version,
2002            embedding_policy = excluded.embedding_policy,
2003            embedding_priority = excluded.embedding_priority,
2004            input_chars = excluded.input_chars,
2005            input_truncated = excluded.input_truncated,
2006            embedding_dim = excluded.embedding_dim,
2007            vector_blob = x'',
2008            status = 'Failed',
2009            attempt_count = chunk_embeddings.attempt_count + 1,
2010            last_error_class = excluded.last_error_class,
2011            next_retry_after_ms = excluded.next_retry_after_ms,
2012            computed_at_ms = NULL,
2013            created_at_ms = excluded.created_at_ms,
2014            last_error = excluded.last_error
2015        ",
2016        params![
2017            chunk.id,
2018            embedder.model_id(),
2019            model_version,
2020            chunk.text_hash,
2021            chunk.input_hash,
2022            EMBEDDING_TEXT_VERSION,
2023            chunk.policy,
2024            chunk.priority,
2025            i64::try_from(chunk.input_chars).unwrap_or(i64::MAX),
2026            chunk.input_truncated,
2027            i64::try_from(embedder.dim()).unwrap_or(i64::MAX),
2028            "Transient",
2029            retry_at,
2030            now_ms(),
2031            error
2032        ],
2033    )?;
2034    Ok(())
2035}
2036
2037pub struct QueryEmbedding {
2038    pub model_id: String,
2039    pub dim: usize,
2040    pub vector: Vec<f32>,
2041}
2042
2043pub fn embed_query(conn: &Connection, query: &str) -> anyhow::Result<Option<QueryEmbedding>> {
2044    ensure_model_manifest(conn)?;
2045    let Ok(embedder) = active_embedder(conn, None) else {
2046        return Ok(None);
2047    };
2048    embed_query_with(&*embedder, query).map(Some)
2049}
2050
2051pub fn hash_query_embedding(query: &str) -> anyhow::Result<QueryEmbedding> {
2052    embed_query_with(&HashEmbedder, query)
2053}
2054
2055fn embed_query_with(embedder: &dyn Embedder, query: &str) -> anyhow::Result<QueryEmbedding> {
2056    let texts = vec![query.to_string()];
2057    let mut vectors = embedder.embed_batch(&texts)?;
2058    let Some(vector) = vectors.pop() else {
2059        anyhow::bail!("embedder {} returned no query vector", embedder.model_id());
2060    };
2061    if vector.len() != embedder.dim() {
2062        anyhow::bail!(
2063            "embedder {} returned query dimension {}, expected {}",
2064            embedder.model_id(),
2065            vector.len(),
2066            embedder.dim()
2067        );
2068    }
2069    Ok(QueryEmbedding { model_id: embedder.model_id().to_string(), dim: embedder.dim(), vector })
2070}
2071
2072pub fn active_embedding_model_id(conn: &Connection) -> anyhow::Result<String> {
2073    ensure_model_manifest(conn)?;
2074    if let Some(model_id) = meta(conn, ACTIVE_EMBEDDING_MODEL_META)? {
2075        return Ok(model_id);
2076    }
2077    Ok(HASH_MODEL_ID.to_string())
2078}
2079
2080pub fn active_embedding_model_version(conn: &Connection, model_id: &str) -> anyhow::Result<String> {
2081    if let Some(version) = reconcile_meta(conn, ACTIVE_EMBEDDING_MODEL_VERSION_META)? {
2082        return Ok(version);
2083    }
2084    Ok(default_model_version(model_id).to_string())
2085}
2086
2087fn default_model_version(model_id: &str) -> &'static str {
2088    match model_id {
2089        HASH_MODEL_ID => "hash-v1",
2090        FASTEMBED_MODEL_ID => "fastembed-all-minilm-l6-v2-v1",
2091        MODEL2VEC_MODEL_ID => "model2vec-potion-retrieval-32m-v1",
2092        _ => "v1",
2093    }
2094}
2095
2096pub fn current_embedding_count(conn: &Connection, model_id: &str) -> anyhow::Result<u64> {
2097    ensure_model_manifest(conn)?;
2098    let model_version = active_embedding_model_version(conn, model_id)?;
2099    let count: i64 = conn.query_row(
2100        "
2101        SELECT COUNT(*)
2102        FROM chunk_embeddings
2103        JOIN chunks ON chunks.id = chunk_embeddings.chunk_id
2104        JOIN ai_models ON ai_models.model_id = chunk_embeddings.model_id
2105        WHERE chunk_embeddings.model_id = ?1
2106          AND ai_models.installed = 1
2107          AND ai_models.disabled = 0
2108          AND ai_models.status = 'Ready'
2109          AND chunk_embeddings.embedding_dim = ai_models.embedding_dim
2110          AND chunk_embeddings.status = 'Current'
2111          AND chunk_embeddings.source_text_hash = chunks.text_hash
2112          AND chunk_embeddings.model_version = ?2
2113          AND chunk_embeddings.embedding_text_version = ?3
2114          AND chunk_embeddings.input_hash != ''
2115        ",
2116        params![model_id, model_version, EMBEDDING_TEXT_VERSION],
2117        |row| row.get(0),
2118    )?;
2119    Ok(u64::try_from(count).unwrap_or(0))
2120}
2121
2122fn active_embedder(
2123    conn: &Connection,
2124    intra_threads: Option<usize>,
2125) -> anyhow::Result<Box<dyn Embedder>> {
2126    let model_id = active_embedding_model_id(conn)?;
2127    let model = model(conn, &model_id)?;
2128    validate_ready_model(&model)?;
2129    match model.model_id.as_str() {
2130        HASH_MODEL_ID => Ok(Box::new(HashEmbedder)),
2131        FASTEMBED_MODEL_ID => fastembed_embedder(intra_threads),
2132        MODEL2VEC_MODEL_ID => model2vec_embedder(),
2133        other => anyhow::bail!("unknown active embedding model `{other}`"),
2134    }
2135}
2136
2137fn model2vec_embedder() -> anyhow::Result<Box<dyn Embedder>> {
2138    #[cfg(feature = "model2vec")]
2139    {
2140        Ok(Box::new(Model2VecEmbedder::new()?))
2141    }
2142    #[cfg(not(feature = "model2vec"))]
2143    {
2144        anyhow::bail!("{}", MODEL2VEC_MISSING_FEATURE_MESSAGE)
2145    }
2146}
2147
2148fn validate_ready_model(model: &ModelInfo) -> anyhow::Result<()> {
2149    if model.disabled {
2150        anyhow::bail!("model {} is disabled", model.model_id);
2151    }
2152    if !model.installed || model.status != "Ready" {
2153        anyhow::bail!("{}", model_not_ready_reason(model));
2154    }
2155    let expected_dim = expected_dim(&model.model_id)
2156        .ok_or_else(|| anyhow::anyhow!("unknown embedding model `{}`", model.model_id))?;
2157    if model.embedding_dim != Some(i64::try_from(expected_dim).unwrap_or(i64::MAX)) {
2158        anyhow::bail!(
2159            "model {} dimension mismatch: manifest has {:?}, expected {}",
2160            model.model_id,
2161            model.embedding_dim,
2162            expected_dim
2163        );
2164    }
2165    Ok(())
2166}
2167
2168fn model_not_ready_reason(model: &ModelInfo) -> String {
2169    if model.disabled {
2170        "Disabled".to_string()
2171    } else if let Some(last_error) = &model.last_error {
2172        last_error.clone()
2173    } else if !model.installed {
2174        "MissingModel".to_string()
2175    } else {
2176        model.status.clone()
2177    }
2178}
2179
2180fn expected_dim(model_id: &str) -> Option<usize> {
2181    match model_id {
2182        HASH_MODEL_ID => Some(HASH_EMBEDDING_DIM),
2183        FASTEMBED_MODEL_ID => Some(FASTEMBED_EMBEDDING_DIM),
2184        MODEL2VEC_MODEL_ID => Some(MODEL2VEC_EMBEDDING_DIM),
2185        _ => None,
2186    }
2187}
2188
2189fn fastembed_embedder(intra_threads: Option<usize>) -> anyhow::Result<Box<dyn Embedder>> {
2190    #[cfg(feature = "fastembed")]
2191    {
2192        Ok(Box::new(FastEmbedEmbedder::new(intra_threads)?))
2193    }
2194    #[cfg(not(feature = "fastembed"))]
2195    {
2196        let _ = intra_threads;
2197        anyhow::bail!("{}", FASTEMBED_MISSING_FEATURE_MESSAGE)
2198    }
2199}
2200
2201pub fn fastembed_cache_dir() -> PathBuf {
2202    if let Ok(cache) = std::env::var("RAG_RAT_MODEL_CACHE") {
2203        return PathBuf::from(cache);
2204    }
2205    if let Ok(cache) = std::env::var("XDG_CACHE_HOME") {
2206        return PathBuf::from(cache).join("rag-rat").join("models");
2207    }
2208    if let Ok(home) = std::env::var("HOME") {
2209        return PathBuf::from(home).join(".cache").join("rag-rat").join("models");
2210    }
2211    PathBuf::from(".rag-rat").join("models")
2212}
2213
2214pub fn decode_vector(blob: &[u8], dim: usize) -> Option<Vec<f32>> {
2215    if blob.len() != dim.checked_mul(4)? {
2216        return None;
2217    }
2218    let mut out = Vec::with_capacity(dim);
2219    for bytes in blob.chunks_exact(4) {
2220        out.push(f32::from_le_bytes(bytes.try_into().ok()?));
2221    }
2222    Some(out)
2223}
2224
2225fn encode_vector(vector: &[f32]) -> Vec<u8> {
2226    let mut out = Vec::with_capacity(vector.len() * 4);
2227    for value in vector {
2228        out.extend_from_slice(&value.to_le_bytes());
2229    }
2230    out
2231}
2232
2233fn hash_embed_text(text: &str, dim: usize) -> Vec<f32> {
2234    let mut vector = vec![0.0_f32; dim];
2235    let tokens = tokens(text);
2236    for token in &tokens {
2237        add_feature(&mut vector, token, 1.0);
2238    }
2239    for pair in tokens.windows(2) {
2240        add_feature(&mut vector, &format!("{}::{}", pair[0], pair[1]), 0.6);
2241    }
2242    normalize(&mut vector);
2243    vector
2244}
2245
2246fn tokens(text: &str) -> Vec<String> {
2247    text.split(|ch: char| !ch.is_alphanumeric() && ch != '_')
2248        .filter(|part| !part.is_empty())
2249        .flat_map(split_identifier)
2250        .filter(|part| part.len() > 1)
2251        .collect()
2252}
2253
2254fn split_identifier(value: &str) -> Vec<String> {
2255    let mut parts = Vec::new();
2256    let mut current = String::new();
2257    let mut previous_lower = false;
2258    for ch in value.chars() {
2259        if ch == '_' || ch == '-' {
2260            if !current.is_empty() {
2261                parts.push(current.to_ascii_lowercase());
2262                current.clear();
2263            }
2264            previous_lower = false;
2265            continue;
2266        }
2267        if previous_lower && ch.is_uppercase() && !current.is_empty() {
2268            parts.push(current.to_ascii_lowercase());
2269            current.clear();
2270        }
2271        previous_lower = ch.is_lowercase() || ch.is_ascii_digit();
2272        current.push(ch);
2273    }
2274    if !current.is_empty() {
2275        parts.push(current.to_ascii_lowercase());
2276    }
2277    parts
2278}
2279
2280fn add_feature(vector: &mut [f32], feature: &str, weight: f32) {
2281    let digest = Sha256::digest(feature.as_bytes());
2282    let index = u16::from_le_bytes([digest[0], digest[1]]) as usize % vector.len();
2283    let sign = if digest[2] & 1 == 0 { 1.0 } else { -1.0 };
2284    vector[index] += sign * weight;
2285}
2286
2287fn normalize(vector: &mut [f32]) {
2288    let norm = vector.iter().map(|value| value * value).sum::<f32>().sqrt();
2289    if norm > 0.0 {
2290        for value in vector {
2291            *value /= norm;
2292        }
2293    }
2294}
2295
2296fn chunk_count(conn: &Connection) -> anyhow::Result<u64> {
2297    // Join the active `files` view (temp.files: active worktree overlay UNION active commit)
2298    // so status counts the chunks reconcile actually works on, not every indexed commit's
2299    // rows. Without a connection context, `files` falls back to the base table (all commits).
2300    let count = conn.query_row(
2301        "SELECT COUNT(*) FROM chunks JOIN files ON files.id = chunks.file_id",
2302        [],
2303        |row| row.get::<_, i64>(0),
2304    )?;
2305    Ok(u64::try_from(count).unwrap_or(0))
2306}
2307
2308fn current_artifact_count(
2309    conn: &Connection,
2310    capability: &str,
2311    model_id: &str,
2312) -> anyhow::Result<u64> {
2313    let model_version = active_embedding_model_version(conn, model_id)?;
2314    let sql = artifact_table_sql(
2315        capability,
2316        "
2317        SELECT COUNT(*)
2318        FROM {table}
2319        JOIN chunks ON chunks.id = {table}.chunk_id
2320        JOIN files ON files.id = chunks.file_id
2321        JOIN ai_models ON ai_models.model_id = {table}.model_id
2322        WHERE {table}.model_id = ?1
2323          AND {table}.status = 'Current'
2324          AND {table}.source_text_hash = chunks.text_hash
2325          AND {table}.model_version = ?2
2326          AND {table}.embedding_text_version = ?3
2327          AND {table}.input_hash != ''
2328          AND {table}.embedding_dim = ai_models.embedding_dim
2329    ",
2330    );
2331    count_query3(conn, &sql, model_id, &model_version, EMBEDDING_TEXT_VERSION)
2332}
2333
2334fn stale_artifact_count(
2335    conn: &Connection,
2336    capability: &str,
2337    model_id: &str,
2338) -> anyhow::Result<u64> {
2339    let model_version = active_embedding_model_version(conn, model_id)?;
2340    let sql = artifact_table_sql(
2341        capability,
2342        "
2343        SELECT COUNT(*)
2344        FROM {table}
2345        JOIN chunks ON chunks.id = {table}.chunk_id
2346        JOIN files ON files.id = chunks.file_id
2347        JOIN ai_models ON ai_models.model_id = {table}.model_id
2348        WHERE {table}.model_id = ?1
2349          AND (
2350            {table}.source_text_hash != chunks.text_hash
2351            OR {table}.model_version != ?2
2352            OR {table}.embedding_text_version != ?3
2353            OR {table}.input_hash = ''
2354            OR {table}.embedding_dim != ai_models.embedding_dim
2355            OR {table}.status = 'Stale'
2356          )
2357    ",
2358    );
2359    count_query3(conn, &sql, model_id, &model_version, EMBEDDING_TEXT_VERSION)
2360}
2361
2362fn status_artifact_count(
2363    conn: &Connection,
2364    capability: &str,
2365    model_id: &str,
2366    status: ArtifactStatus,
2367) -> anyhow::Result<u64> {
2368    let sql = artifact_table_sql(
2369        capability,
2370        "
2371        SELECT COUNT(*)
2372        FROM {table}
2373        JOIN chunks ON chunks.id = {table}.chunk_id
2374        JOIN files ON files.id = chunks.file_id
2375        WHERE {table}.model_id = ?1 AND {table}.status = ?2
2376    ",
2377    );
2378    let count =
2379        conn.query_row(&sql, params![model_id, status.as_str()], |row| row.get::<_, i64>(0))?;
2380    Ok(u64::try_from(count).unwrap_or(0))
2381}
2382
2383fn count_query3(
2384    conn: &Connection,
2385    sql: &str,
2386    model_id: &str,
2387    left: &str,
2388    right: &str,
2389) -> anyhow::Result<u64> {
2390    let count = conn.query_row(sql, params![model_id, left, right], |row| row.get::<_, i64>(0))?;
2391    Ok(u64::try_from(count).unwrap_or(0))
2392}
2393
2394fn artifact_table_sql(_capability: &str, template: &str) -> String {
2395    let table = "chunk_embeddings";
2396    template.replace("{table}", table)
2397}
2398
2399fn set_meta(conn: &Connection, key: &str, value: &str) -> anyhow::Result<()> {
2400    conn.execute(
2401        "INSERT INTO index_meta(key, value) VALUES (?1, ?2)
2402         ON CONFLICT(key) DO UPDATE SET value = excluded.value",
2403        params![key, value],
2404    )?;
2405    Ok(())
2406}
2407
2408fn meta(conn: &Connection, key: &str) -> anyhow::Result<Option<String>> {
2409    Ok(conn
2410        .query_row("SELECT value FROM index_meta WHERE key = ?1", [key], |row| row.get(0))
2411        .optional()?)
2412}
2413
2414fn set_reconcile_meta(conn: &Connection, key: &str, value: &str) -> anyhow::Result<()> {
2415    conn.execute(
2416        "INSERT INTO reconcile_meta(key, value) VALUES (?1, ?2)
2417         ON CONFLICT(key) DO UPDATE SET value = excluded.value",
2418        params![key, value],
2419    )?;
2420    Ok(())
2421}
2422
2423fn reconcile_meta(conn: &Connection, key: &str) -> anyhow::Result<Option<String>> {
2424    Ok(conn
2425        .query_row("SELECT value FROM reconcile_meta WHERE key = ?1", [key], |row| row.get(0))
2426        .optional()?)
2427}
2428
2429fn collect_rows<T>(
2430    rows: rusqlite::MappedRows<'_, impl FnMut(&rusqlite::Row<'_>) -> rusqlite::Result<T>>,
2431) -> anyhow::Result<Vec<T>> {
2432    let mut out = Vec::new();
2433    for row in rows {
2434        out.push(row?);
2435    }
2436    Ok(out)
2437}
2438
2439fn find_existing_embedding(
2440    conn: &Connection,
2441    model_id: &str,
2442    input_hash: &str,
2443    dim: usize,
2444) -> anyhow::Result<Option<Vec<f32>>> {
2445    let vector: Option<Vec<u8>> = conn
2446        .query_row(
2447            "SELECT vector_blob FROM chunk_embeddings
2448         WHERE model_id = ?1 AND input_hash = ?2 AND status = 'Current' AND embedding_dim = ?3
2449         LIMIT 1",
2450            params![model_id, input_hash, i64::try_from(dim).unwrap_or(i64::MAX)],
2451            |row| row.get(0),
2452        )
2453        .optional()?;
2454    if let Some(blob) = vector { Ok(decode_vector(&blob, dim)) } else { Ok(None) }
2455}