Skip to main content

fathomdb_engine/admin/
vector.rs

1use std::path::Path;
2
3use fathomdb_schema::SchemaError;
4use rusqlite::{OptionalExtension, TransactionBehavior};
5use serde::{Deserialize, Serialize};
6use sha2::Digest;
7
8use super::{
9    AdminService, CURRENT_VECTOR_CONTRACT_FORMAT_VERSION, EngineError, MAX_AUDIT_METADATA_BYTES,
10    MAX_CONTRACT_JSON_BYTES, MAX_POLICY_LEN, MAX_PROFILE_LEN, ProjectionRepairReport,
11    ProjectionTarget, VecProfile, VectorRegenerationConfig, VectorRegenerationReport,
12};
13use crate::embedder::{BatchEmbedder, QueryEmbedder, QueryEmbedderIdentity};
14use crate::ids::new_id;
15
16#[allow(dead_code)]
17#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
18pub(super) struct VectorEmbeddingContractRecord {
19    profile: String,
20    table_name: String,
21    model_identity: String,
22    model_version: String,
23    dimension: usize,
24    normalization_policy: String,
25    chunking_policy: String,
26    preprocessing_policy: String,
27    generator_command_json: String,
28    applied_at: i64,
29    snapshot_hash: String,
30    contract_format_version: i64,
31}
32
33#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
34pub(super) struct VectorRegenerationInputChunk {
35    pub(super) chunk_id: String,
36    pub(super) node_logical_id: String,
37    pub(super) kind: String,
38    pub(super) text_content: String,
39    pub(super) byte_start: Option<i64>,
40    pub(super) byte_end: Option<i64>,
41    pub(super) source_ref: Option<String>,
42    pub(super) created_at: i64,
43}
44
45#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
46pub(super) struct VectorRegenerationInput {
47    pub(super) profile: String,
48    pub(super) table_name: String,
49    pub(super) model_identity: String,
50    pub(super) model_version: String,
51    pub(super) dimension: usize,
52    pub(super) normalization_policy: String,
53    pub(super) chunking_policy: String,
54    pub(super) preprocessing_policy: String,
55    pub(super) chunks: Vec<VectorRegenerationInputChunk>,
56}
57
58#[derive(Clone, Copy, Debug, PartialEq, Eq)]
59pub(crate) enum VectorRegenerationFailureClass {
60    InvalidContract,
61    EmbedderFailure,
62    InvalidEmbedderOutput,
63    SnapshotDrift,
64    UnsupportedVecCapability,
65}
66
67impl VectorRegenerationFailureClass {
68    fn label(self) -> &'static str {
69        match self {
70            Self::InvalidContract => "invalid contract",
71            Self::EmbedderFailure => "embedder failure",
72            Self::InvalidEmbedderOutput => "invalid embedder output",
73            Self::SnapshotDrift => "snapshot drift",
74            Self::UnsupportedVecCapability => "unsupported vec capability",
75        }
76    }
77
78    fn retryable(self) -> bool {
79        matches!(self, Self::SnapshotDrift)
80    }
81}
82
83#[derive(Clone, Debug, PartialEq, Eq)]
84pub(crate) struct VectorRegenerationFailure {
85    class: VectorRegenerationFailureClass,
86    detail: String,
87}
88
89impl VectorRegenerationFailure {
90    pub(crate) fn new(class: VectorRegenerationFailureClass, detail: impl Into<String>) -> Self {
91        Self {
92            class,
93            detail: detail.into(),
94        }
95    }
96
97    pub(super) fn to_engine_error(&self) -> EngineError {
98        let retry_suffix = if self.class.retryable() {
99            " [retryable]"
100        } else {
101            ""
102        };
103        EngineError::Bridge(format!(
104            "vector regeneration {}: {}{}",
105            self.class.label(),
106            self.detail,
107            retry_suffix
108        ))
109    }
110
111    pub(super) fn failure_class_label(&self) -> &'static str {
112        self.class.label()
113    }
114}
115
116#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
117pub(super) struct VectorRegenerationAuditMetadata {
118    pub(super) profile: String,
119    pub(super) model_identity: String,
120    pub(super) model_version: String,
121    pub(super) chunk_count: usize,
122    pub(super) snapshot_hash: String,
123    #[serde(skip_serializing_if = "Option::is_none")]
124    pub(super) failure_class: Option<String>,
125}
126
127impl AdminService {
128    /// Retrieve the vector embedding profile for a specific node `kind`.
129    ///
130    /// Reads from `projection_profiles` under `(kind=<kind>, facet='vec')`.
131    /// Returns `None` if no vector profile has been persisted for this kind yet.
132    ///
133    /// # Errors
134    /// Returns [`EngineError`] if the database query fails.
135    pub fn get_vec_profile(&self, kind: &str) -> Result<Option<VecProfile>, EngineError> {
136        let conn = self.connect()?;
137        let result = conn
138            .query_row(
139                "SELECT \
140                   json_extract(config_json, '$.model_identity'), \
141                   json_extract(config_json, '$.model_version'), \
142                   CAST(json_extract(config_json, '$.dimensions') AS INTEGER), \
143                   active_at, \
144                   created_at \
145                 FROM projection_profiles WHERE kind = ?1 AND facet = 'vec'",
146                rusqlite::params![kind],
147                |row| {
148                    Ok(VecProfile {
149                        model_identity: row.get::<_, Option<String>>(0)?.unwrap_or_default(),
150                        model_version: row.get(1)?,
151                        dimensions: {
152                            let d: i64 = row.get::<_, Option<i64>>(2)?.unwrap_or(0);
153                            u32::try_from(d).unwrap_or(0)
154                        },
155                        active_at: row.get(3)?,
156                        created_at: row.get(4)?,
157                    })
158                },
159            )
160            .optional()?;
161        Ok(result)
162    }
163
164    /// Write or update the global vector profile from a JSON identity string.
165    ///
166    /// This is a private helper called after a successful vector regeneration.
167    /// Errors are logged as warnings and not propagated to the caller.
168    #[allow(dead_code)]
169    fn set_vec_profile_inner(
170        conn: &rusqlite::Connection,
171        identity_json: &str,
172    ) -> Result<VecProfile, rusqlite::Error> {
173        conn.execute(
174            r"INSERT INTO projection_profiles (kind, facet, config_json, active_at, created_at)
175              VALUES ('*', 'vec', ?1, unixepoch(), unixepoch())
176              ON CONFLICT(kind, facet) DO UPDATE SET
177                  config_json = ?1,
178                  active_at   = unixepoch()",
179            rusqlite::params![identity_json],
180        )?;
181        conn.query_row(
182            "SELECT \
183               json_extract(config_json, '$.model_identity'), \
184               json_extract(config_json, '$.model_version'), \
185               CAST(json_extract(config_json, '$.dimensions') AS INTEGER), \
186               active_at, \
187               created_at \
188             FROM projection_profiles WHERE kind = '*' AND facet = 'vec'",
189            [],
190            |row| {
191                Ok(VecProfile {
192                    model_identity: row.get(0)?,
193                    model_version: row.get(1)?,
194                    dimensions: {
195                        let d: i64 = row.get(2)?;
196                        u32::try_from(d).unwrap_or(0)
197                    },
198                    active_at: row.get(3)?,
199                    created_at: row.get(4)?,
200                })
201            },
202        )
203    }
204
205    /// Persist or update the global vector profile from a JSON config string.
206    ///
207    /// `config_json` must be valid JSON with at least a `model_identity`
208    /// field and `dimensions`.  The JSON is stored verbatim in the
209    /// `projection_profiles` table under `kind='*'`, `facet='vec'`.
210    ///
211    /// # Errors
212    /// Returns [`EngineError`] if the database write fails.
213    pub fn set_vec_profile(&self, config_json: &str) -> Result<VecProfile, EngineError> {
214        let conn = self.connect()?;
215        Self::set_vec_profile_inner(&conn, config_json).map_err(EngineError::Sqlite)
216    }
217
218    /// Estimate the cost of rebuilding a projection.
219    ///
220    /// For facet `"fts"`: counts active nodes of `kind`.
221    /// For facet `"vec"`: counts all chunks.
222    ///
223    /// # Errors
224    /// Returns [`EngineError`] for unknown facets or database errors.
225    pub fn preview_projection_impact(
226        &self,
227        kind: &str,
228        facet: &str,
229    ) -> Result<super::ProjectionImpact, EngineError> {
230        let conn = self.connect()?;
231        match facet {
232            "fts" => {
233                let rows: u64 = conn
234                    .query_row(
235                        "SELECT count(*) FROM nodes WHERE kind = ?1 AND superseded_at IS NULL",
236                        rusqlite::params![kind],
237                        |row| row.get::<_, i64>(0),
238                    )
239                    .map(i64::cast_unsigned)?;
240                let current_tokenizer = self.get_fts_profile(kind)?.map(|p| p.tokenizer);
241                Ok(super::ProjectionImpact {
242                    rows_to_rebuild: rows,
243                    estimated_seconds: rows / 5000,
244                    temp_db_size_bytes: rows * 200,
245                    current_tokenizer,
246                    target_tokenizer: None,
247                })
248            }
249            "vec" => {
250                let rows: u64 = conn
251                    .query_row("SELECT count(*) FROM chunks", [], |row| {
252                        row.get::<_, i64>(0)
253                    })
254                    .map(i64::cast_unsigned)?;
255                Ok(super::ProjectionImpact {
256                    rows_to_rebuild: rows,
257                    estimated_seconds: rows / 100,
258                    temp_db_size_bytes: rows * 1536,
259                    current_tokenizer: None,
260                    target_tokenizer: None,
261                })
262            }
263            other => Err(EngineError::Bridge(format!(
264                "unknown projection facet: {other:?}"
265            ))),
266        }
267    }
268
269    /// Recreate enabled vector profiles from persisted `vector_profiles` metadata.
270    ///
271    /// # Errors
272    /// Returns [`EngineError`] if the database connection fails, reading metadata fails,
273    /// or sqlite-vec support is unavailable while enabled profiles are present.
274    pub fn restore_vector_profiles(&self) -> Result<ProjectionRepairReport, EngineError> {
275        let conn = self.connect()?;
276        let profiles: Vec<(String, String, i64)> = {
277            let mut stmt = conn.prepare(
278                "SELECT profile, table_name, dimension \
279                 FROM vector_profiles WHERE enabled = 1 ORDER BY profile",
280            )?;
281            stmt.query_map([], |row| {
282                Ok((
283                    row.get::<_, String>(0)?,
284                    row.get::<_, String>(1)?,
285                    row.get::<_, i64>(2)?,
286                ))
287            })?
288            .collect::<Result<Vec<_>, _>>()?
289        };
290
291        for (profile, table_name, dimension) in &profiles {
292            let dimension = usize::try_from(*dimension).map_err(|_| {
293                EngineError::Bridge(format!("invalid vector profile dimension: {dimension}"))
294            })?;
295            self.schema_manager
296                .ensure_vector_profile(&conn, profile, table_name, dimension)?;
297        }
298
299        Ok(ProjectionRepairReport {
300            targets: vec![ProjectionTarget::Vec],
301            rebuilt_rows: profiles.len(),
302            notes: vec![],
303        })
304    }
305
306    /// Rebuild vector embeddings using an application-supplied regeneration
307    /// contract and generator command.
308    ///
309    /// The config is persisted in `vector_embedding_contracts` so the metadata
310    /// required for recovery survives future repair runs.
311    ///
312    /// Vector identity is stamped from [`QueryEmbedder::identity`] — the
313    /// caller supplies the embedder and cannot override its identity. This
314    /// makes drift between the read-path and write-path identity stories
315    /// structurally impossible.
316    ///
317    /// # Errors
318    /// Returns [`EngineError`] if the database connection fails, the config is
319    /// invalid, the embedder fails, or the regenerated embeddings are
320    /// malformed.
321    #[allow(clippy::too_many_lines)]
322    pub fn regenerate_vector_embeddings(
323        &self,
324        embedder: &dyn QueryEmbedder,
325        config: &VectorRegenerationConfig,
326    ) -> Result<VectorRegenerationReport, EngineError> {
327        let conn = self.connect()?;
328        let identity = embedder.identity();
329        let config = validate_vector_regeneration_config(&conn, config, &identity)
330            .map_err(|failure| failure.to_engine_error())?;
331        let chunks = collect_regeneration_chunks(&conn)?;
332        let payload = build_regeneration_input(&config, &identity, chunks.clone());
333        let snapshot_hash = compute_snapshot_hash(&payload)?;
334        let audit_metadata = VectorRegenerationAuditMetadata {
335            profile: config.profile.clone(),
336            model_identity: identity.model_identity.clone(),
337            model_version: identity.model_version.clone(),
338            chunk_count: chunks.len(),
339            snapshot_hash: snapshot_hash.clone(),
340            failure_class: None,
341        };
342        persist_vector_regeneration_event(
343            &conn,
344            "vector_regeneration_requested",
345            &config.profile,
346            &audit_metadata,
347        )?;
348        let notes = vec!["vector embeddings regenerated via configured embedder".to_owned()];
349
350        let mut embedding_map: std::collections::HashMap<String, Vec<u8>> =
351            std::collections::HashMap::with_capacity(chunks.len());
352        for chunk in &chunks {
353            let vector = match embedder.embed_query(&chunk.text_content) {
354                Ok(vector) => vector,
355                Err(error) => {
356                    let failure = VectorRegenerationFailure::new(
357                        VectorRegenerationFailureClass::EmbedderFailure,
358                        format!("embedder failed for chunk '{}': {error}", chunk.chunk_id),
359                    );
360                    self.persist_vector_regeneration_failure_best_effort(
361                        &config.profile,
362                        &audit_metadata,
363                        &failure,
364                    );
365                    return Err(failure.to_engine_error());
366                }
367            };
368            if vector.len() != identity.dimension {
369                let failure = VectorRegenerationFailure::new(
370                    VectorRegenerationFailureClass::InvalidEmbedderOutput,
371                    format!(
372                        "embedder produced {} values for chunk '{}', expected {}",
373                        vector.len(),
374                        chunk.chunk_id,
375                        identity.dimension
376                    ),
377                );
378                self.persist_vector_regeneration_failure_best_effort(
379                    &config.profile,
380                    &audit_metadata,
381                    &failure,
382                );
383                return Err(failure.to_engine_error());
384            }
385            if vector.iter().any(|value| !value.is_finite()) {
386                let failure = VectorRegenerationFailure::new(
387                    VectorRegenerationFailureClass::InvalidEmbedderOutput,
388                    format!(
389                        "embedder returned non-finite values for chunk '{}'",
390                        chunk.chunk_id
391                    ),
392                );
393                self.persist_vector_regeneration_failure_best_effort(
394                    &config.profile,
395                    &audit_metadata,
396                    &failure,
397                );
398                return Err(failure.to_engine_error());
399            }
400            let bytes: Vec<u8> = vector
401                .iter()
402                .flat_map(|value| value.to_le_bytes())
403                .collect();
404            embedding_map.insert(chunk.chunk_id.clone(), bytes);
405        }
406
407        let table_name = fathomdb_schema::vec_kind_table_name(&config.kind);
408        let mut conn = conn;
409        let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
410        match self
411            .schema_manager
412            .ensure_vec_kind_profile(&tx, &config.kind, identity.dimension)
413        {
414            Ok(()) => {}
415            Err(SchemaError::MissingCapability(message)) => {
416                let failure = VectorRegenerationFailure::new(
417                    VectorRegenerationFailureClass::UnsupportedVecCapability,
418                    message,
419                );
420                drop(tx);
421                self.persist_vector_regeneration_failure_best_effort(
422                    &config.profile,
423                    &audit_metadata,
424                    &failure,
425                );
426                return Err(failure.to_engine_error());
427            }
428            Err(error) => return Err(EngineError::Schema(error)),
429        }
430        let apply_chunks = collect_regeneration_chunks(&tx)?;
431        let apply_payload = build_regeneration_input(&config, &identity, apply_chunks.clone());
432        let apply_hash = compute_snapshot_hash(&apply_payload)?;
433        if apply_hash != snapshot_hash {
434            let failure = VectorRegenerationFailure::new(
435                VectorRegenerationFailureClass::SnapshotDrift,
436                "chunk snapshot changed during generation; retry".to_owned(),
437            );
438            drop(tx);
439            self.persist_vector_regeneration_failure_best_effort(
440                &config.profile,
441                &audit_metadata,
442                &failure,
443            );
444            return Err(failure.to_engine_error());
445        }
446        persist_vector_contract(&tx, &config, &table_name, &identity, &snapshot_hash)?;
447        tx.execute(&format!("DELETE FROM {table_name}"), [])?;
448        let mut stmt = tx.prepare_cached(&format!(
449            "INSERT INTO {table_name} (chunk_id, embedding) VALUES (?1, ?2)"
450        ))?;
451        let mut regenerated_rows = 0usize;
452        for chunk in &apply_chunks {
453            let Some(embedding) = embedding_map.remove(&chunk.chunk_id) else {
454                drop(stmt);
455                drop(tx);
456                let failure = VectorRegenerationFailure::new(
457                    VectorRegenerationFailureClass::InvalidEmbedderOutput,
458                    format!(
459                        "embedder did not produce a vector for chunk '{}'",
460                        chunk.chunk_id
461                    ),
462                );
463                self.persist_vector_regeneration_failure_best_effort(
464                    &config.profile,
465                    &audit_metadata,
466                    &failure,
467                );
468                return Err(failure.to_engine_error());
469            };
470            stmt.execute(rusqlite::params![chunk.chunk_id.as_str(), embedding])?;
471            regenerated_rows += 1;
472        }
473        drop(stmt);
474        persist_vector_regeneration_event(
475            &tx,
476            "vector_regeneration_apply",
477            &config.profile,
478            &audit_metadata,
479        )?;
480        tx.commit()?;
481
482        Ok(VectorRegenerationReport {
483            profile: config.profile.clone(),
484            table_name,
485            dimension: identity.dimension,
486            total_chunks: chunks.len(),
487            regenerated_rows,
488            contract_persisted: true,
489            notes,
490        })
491    }
492
493    /// Regenerate vector embeddings in-process using a [`BatchEmbedder`].
494    ///
495    /// Functionally equivalent to [`regenerate_vector_embeddings`] but uses
496    /// `BatchEmbedder::batch_embed` to process all chunks in one call. This
497    /// is the intended path for [`BuiltinBgeSmallEmbedder`] — it keeps the
498    /// forward pass in-process without requiring an external subprocess.
499    ///
500    /// The subprocess-based path ([`regenerate_vector_embeddings`]) remains
501    /// intact for callers who supply their own generator binary.
502    ///
503    /// # Errors
504    /// Returns [`EngineError`] if the database connection fails, the config is
505    /// invalid, the embedder fails, or the regenerated embeddings are malformed.
506    #[allow(clippy::too_many_lines)]
507    pub fn regenerate_vector_embeddings_in_process(
508        &self,
509        embedder: &dyn BatchEmbedder,
510        config: &VectorRegenerationConfig,
511    ) -> Result<VectorRegenerationReport, EngineError> {
512        let conn = self.connect()?;
513        let identity = embedder.identity();
514        let config = validate_vector_regeneration_config(&conn, config, &identity)
515            .map_err(|failure| failure.to_engine_error())?;
516        let chunks = collect_regeneration_chunks(&conn)?;
517        let payload = build_regeneration_input(&config, &identity, chunks.clone());
518        let snapshot_hash = compute_snapshot_hash(&payload)?;
519        let audit_metadata = VectorRegenerationAuditMetadata {
520            profile: config.profile.clone(),
521            model_identity: identity.model_identity.clone(),
522            model_version: identity.model_version.clone(),
523            chunk_count: chunks.len(),
524            snapshot_hash: snapshot_hash.clone(),
525            failure_class: None,
526        };
527        persist_vector_regeneration_event(
528            &conn,
529            "vector_regeneration_requested",
530            &config.profile,
531            &audit_metadata,
532        )?;
533        let notes = vec!["vector embeddings regenerated via in-process batch embedder".to_owned()];
534
535        // Collect texts and call batch_embed once for all chunks.
536        let chunk_texts: Vec<String> = chunks.iter().map(|c| c.text_content.clone()).collect();
537        let batch_vectors = match embedder.batch_embed(&chunk_texts) {
538            Ok(vecs) => vecs,
539            Err(error) => {
540                let failure = VectorRegenerationFailure::new(
541                    VectorRegenerationFailureClass::EmbedderFailure,
542                    format!("batch embedder failed: {error}"),
543                );
544                self.persist_vector_regeneration_failure_best_effort(
545                    &config.profile,
546                    &audit_metadata,
547                    &failure,
548                );
549                return Err(failure.to_engine_error());
550            }
551        };
552        if batch_vectors.len() != chunks.len() {
553            let failure = VectorRegenerationFailure::new(
554                VectorRegenerationFailureClass::InvalidEmbedderOutput,
555                format!(
556                    "batch embedder returned {} vectors for {} chunks",
557                    batch_vectors.len(),
558                    chunks.len()
559                ),
560            );
561            self.persist_vector_regeneration_failure_best_effort(
562                &config.profile,
563                &audit_metadata,
564                &failure,
565            );
566            return Err(failure.to_engine_error());
567        }
568
569        let mut embedding_map: std::collections::HashMap<String, Vec<u8>> =
570            std::collections::HashMap::with_capacity(chunks.len());
571        for (chunk, vector) in chunks.iter().zip(batch_vectors) {
572            if vector.len() != identity.dimension {
573                let failure = VectorRegenerationFailure::new(
574                    VectorRegenerationFailureClass::InvalidEmbedderOutput,
575                    format!(
576                        "embedder produced {} values for chunk '{}', expected {}",
577                        vector.len(),
578                        chunk.chunk_id,
579                        identity.dimension
580                    ),
581                );
582                self.persist_vector_regeneration_failure_best_effort(
583                    &config.profile,
584                    &audit_metadata,
585                    &failure,
586                );
587                return Err(failure.to_engine_error());
588            }
589            if vector.iter().any(|value| !value.is_finite()) {
590                let failure = VectorRegenerationFailure::new(
591                    VectorRegenerationFailureClass::InvalidEmbedderOutput,
592                    format!(
593                        "embedder returned non-finite values for chunk '{}'",
594                        chunk.chunk_id
595                    ),
596                );
597                self.persist_vector_regeneration_failure_best_effort(
598                    &config.profile,
599                    &audit_metadata,
600                    &failure,
601                );
602                return Err(failure.to_engine_error());
603            }
604            let bytes: Vec<u8> = vector
605                .iter()
606                .flat_map(|value| value.to_le_bytes())
607                .collect();
608            embedding_map.insert(chunk.chunk_id.clone(), bytes);
609        }
610
611        let mut conn = conn;
612        let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
613        let table_name = fathomdb_schema::vec_kind_table_name(&config.kind);
614        match self
615            .schema_manager
616            .ensure_vec_kind_profile(&tx, &config.kind, identity.dimension)
617        {
618            Ok(()) => {}
619            Err(SchemaError::MissingCapability(message)) => {
620                let failure = VectorRegenerationFailure::new(
621                    VectorRegenerationFailureClass::UnsupportedVecCapability,
622                    message,
623                );
624                drop(tx);
625                self.persist_vector_regeneration_failure_best_effort(
626                    &config.profile,
627                    &audit_metadata,
628                    &failure,
629                );
630                return Err(failure.to_engine_error());
631            }
632            Err(error) => return Err(EngineError::Schema(error)),
633        }
634        let apply_chunks = collect_regeneration_chunks(&tx)?;
635        let apply_payload = build_regeneration_input(&config, &identity, apply_chunks.clone());
636        let apply_hash = compute_snapshot_hash(&apply_payload)?;
637        if apply_hash != snapshot_hash {
638            let failure = VectorRegenerationFailure::new(
639                VectorRegenerationFailureClass::SnapshotDrift,
640                "chunk snapshot changed during generation; retry".to_owned(),
641            );
642            drop(tx);
643            self.persist_vector_regeneration_failure_best_effort(
644                &config.profile,
645                &audit_metadata,
646                &failure,
647            );
648            return Err(failure.to_engine_error());
649        }
650        persist_vector_contract(&tx, &config, &table_name, &identity, &snapshot_hash)?;
651        tx.execute(&format!("DELETE FROM {table_name}"), [])?;
652        let mut stmt = tx.prepare_cached(&format!(
653            "INSERT INTO {table_name} (chunk_id, embedding) VALUES (?1, ?2)"
654        ))?;
655        let mut regenerated_rows = 0usize;
656        for chunk in &apply_chunks {
657            let Some(embedding) = embedding_map.remove(&chunk.chunk_id) else {
658                drop(stmt);
659                drop(tx);
660                let failure = VectorRegenerationFailure::new(
661                    VectorRegenerationFailureClass::InvalidEmbedderOutput,
662                    format!(
663                        "embedder did not produce a vector for chunk '{}'",
664                        chunk.chunk_id
665                    ),
666                );
667                self.persist_vector_regeneration_failure_best_effort(
668                    &config.profile,
669                    &audit_metadata,
670                    &failure,
671                );
672                return Err(failure.to_engine_error());
673            };
674            stmt.execute(rusqlite::params![chunk.chunk_id.as_str(), embedding])?;
675            regenerated_rows += 1;
676        }
677        drop(stmt);
678        persist_vector_regeneration_event(
679            &tx,
680            "vector_regeneration_apply",
681            &config.profile,
682            &audit_metadata,
683        )?;
684        tx.commit()?;
685
686        Ok(VectorRegenerationReport {
687            profile: config.profile.clone(),
688            table_name,
689            dimension: identity.dimension,
690            total_chunks: chunks.len(),
691            regenerated_rows,
692            contract_persisted: true,
693            notes,
694        })
695    }
696
697    pub(super) fn persist_vector_regeneration_failure_best_effort(
698        &self,
699        profile: &str,
700        metadata: &VectorRegenerationAuditMetadata,
701        failure: &VectorRegenerationFailure,
702    ) {
703        let Ok(conn) = self.connect() else {
704            return;
705        };
706        let failure_metadata = VectorRegenerationAuditMetadata {
707            profile: metadata.profile.clone(),
708            model_identity: metadata.model_identity.clone(),
709            model_version: metadata.model_version.clone(),
710            chunk_count: metadata.chunk_count,
711            snapshot_hash: metadata.snapshot_hash.clone(),
712            failure_class: Some(failure.failure_class_label().to_owned()),
713        };
714        let _ = persist_vector_regeneration_event(
715            &conn,
716            "vector_regeneration_failed",
717            profile,
718            &failure_metadata,
719        );
720    }
721
722    /// # Errors
723    /// Returns [`EngineError`] if the database connection fails or the projection rebuild fails.
724    pub fn rebuild_projections(
725        &self,
726        target: ProjectionTarget,
727    ) -> Result<ProjectionRepairReport, EngineError> {
728        self.projections.rebuild_projections(target)
729    }
730
731    /// # Errors
732    /// Returns [`EngineError`] if the database connection fails or the projection rebuild fails.
733    pub fn rebuild_missing_projections(&self) -> Result<ProjectionRepairReport, EngineError> {
734        self.projections.rebuild_missing_projections()
735    }
736}
737
738/// # Errors
739/// Returns [`EngineError`] if the file cannot be read or the config is invalid.
740pub fn load_vector_regeneration_config(
741    path: impl AsRef<Path>,
742) -> Result<VectorRegenerationConfig, EngineError> {
743    let path = path.as_ref();
744    let raw = std::fs::read_to_string(path)?;
745    match path.extension().and_then(|ext| ext.to_str()) {
746        Some("toml") => {
747            toml::from_str(&raw).map_err(|error| EngineError::Bridge(error.to_string()))
748        }
749        Some("json") | None => {
750            serde_json::from_str(&raw).map_err(|error| EngineError::Bridge(error.to_string()))
751        }
752        Some(other) => Err(EngineError::Bridge(format!(
753            "unsupported vector regeneration config extension: {other}"
754        ))),
755    }
756}
757
758fn validate_vector_regeneration_config(
759    conn: &rusqlite::Connection,
760    config: &VectorRegenerationConfig,
761    identity: &QueryEmbedderIdentity,
762) -> Result<VectorRegenerationConfig, VectorRegenerationFailure> {
763    let kind = validate_bounded_text("kind", &config.kind, MAX_PROFILE_LEN)?;
764    let profile = validate_bounded_text("profile", &config.profile, MAX_PROFILE_LEN)?;
765    if identity.dimension == 0 {
766        return Err(VectorRegenerationFailure::new(
767            VectorRegenerationFailureClass::InvalidContract,
768            "embedder reports dimension 0".to_owned(),
769        ));
770    }
771    let chunking_policy =
772        validate_bounded_text("chunking_policy", &config.chunking_policy, MAX_POLICY_LEN)?;
773    let preprocessing_policy = validate_bounded_text(
774        "preprocessing_policy",
775        &config.preprocessing_policy,
776        MAX_POLICY_LEN,
777    )?;
778
779    if let Some(existing_dimension) = current_vector_profile_dimension(conn, &profile)?
780        && existing_dimension != identity.dimension
781    {
782        return Err(VectorRegenerationFailure::new(
783            VectorRegenerationFailureClass::InvalidContract,
784            format!(
785                "embedder dimension {} does not match existing vector profile dimension {}",
786                identity.dimension, existing_dimension
787            ),
788        ));
789    }
790
791    validate_existing_contract_version(conn, &profile)?;
792
793    let normalized = VectorRegenerationConfig {
794        kind,
795        profile,
796        chunking_policy,
797        preprocessing_policy,
798    };
799    let serialized = serde_json::to_vec(&normalized).map_err(|error| {
800        VectorRegenerationFailure::new(
801            VectorRegenerationFailureClass::InvalidContract,
802            error.to_string(),
803        )
804    })?;
805    if serialized.len() > MAX_CONTRACT_JSON_BYTES {
806        return Err(VectorRegenerationFailure::new(
807            VectorRegenerationFailureClass::InvalidContract,
808            format!("serialized contract exceeds {MAX_CONTRACT_JSON_BYTES} bytes"),
809        ));
810    }
811
812    Ok(normalized)
813}
814
815#[allow(clippy::cast_possible_wrap)]
816fn persist_vector_contract(
817    conn: &rusqlite::Connection,
818    config: &VectorRegenerationConfig,
819    table_name: &str,
820    identity: &QueryEmbedderIdentity,
821    snapshot_hash: &str,
822) -> Result<(), EngineError> {
823    conn.execute(
824        r"
825        INSERT OR REPLACE INTO vector_embedding_contracts (
826            profile,
827            table_name,
828            model_identity,
829            model_version,
830            dimension,
831            normalization_policy,
832            chunking_policy,
833            preprocessing_policy,
834            generator_command_json,
835            applied_at,
836            snapshot_hash,
837            contract_format_version,
838            updated_at
839        ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, unixepoch(), ?10, ?11, unixepoch())
840        ",
841        rusqlite::params![
842            config.profile.as_str(),
843            table_name,
844            identity.model_identity.as_str(),
845            identity.model_version.as_str(),
846            identity.dimension as i64,
847            identity.normalization_policy.as_str(),
848            config.chunking_policy.as_str(),
849            config.preprocessing_policy.as_str(),
850            "[]",
851            snapshot_hash,
852            CURRENT_VECTOR_CONTRACT_FORMAT_VERSION,
853        ],
854    )?;
855    Ok(())
856}
857
858fn persist_vector_regeneration_event(
859    conn: &rusqlite::Connection,
860    event_type: &str,
861    subject: &str,
862    metadata: &VectorRegenerationAuditMetadata,
863) -> Result<(), EngineError> {
864    let metadata_json = serialize_audit_metadata(metadata)?;
865    conn.execute(
866        "INSERT INTO provenance_events (id, event_type, subject, metadata_json) VALUES (?1, ?2, ?3, ?4)",
867        rusqlite::params![new_id(), event_type, subject, metadata_json],
868    )?;
869    Ok(())
870}
871
872fn validate_bounded_text(
873    field: &str,
874    value: &str,
875    max_len: usize,
876) -> Result<String, VectorRegenerationFailure> {
877    let trimmed = value.trim();
878    if trimmed.is_empty() {
879        return Err(VectorRegenerationFailure::new(
880            VectorRegenerationFailureClass::InvalidContract,
881            format!("{field} must not be empty"),
882        ));
883    }
884    if trimmed.len() > max_len {
885        return Err(VectorRegenerationFailure::new(
886            VectorRegenerationFailureClass::InvalidContract,
887            format!("{field} exceeds max length {max_len}"),
888        ));
889    }
890    Ok(trimmed.to_owned())
891}
892
893fn current_vector_profile_dimension(
894    conn: &rusqlite::Connection,
895    profile: &str,
896) -> Result<Option<usize>, VectorRegenerationFailure> {
897    let dimension: Option<i64> = conn
898        .query_row(
899            "SELECT dimension FROM vector_profiles WHERE profile = ?1 AND enabled = 1",
900            [profile],
901            |row| row.get(0),
902        )
903        .optional()
904        .map_err(|error| {
905            VectorRegenerationFailure::new(
906                VectorRegenerationFailureClass::InvalidContract,
907                error.to_string(),
908            )
909        })?;
910    dimension
911        .map(|value| {
912            usize::try_from(value).map_err(|_| {
913                VectorRegenerationFailure::new(
914                    VectorRegenerationFailureClass::InvalidContract,
915                    format!("stored vector profile dimension is invalid: {value}"),
916                )
917            })
918        })
919        .transpose()
920}
921
922fn validate_existing_contract_version(
923    conn: &rusqlite::Connection,
924    profile: &str,
925) -> Result<(), VectorRegenerationFailure> {
926    let version: Option<i64> = conn
927        .query_row(
928            "SELECT contract_format_version FROM vector_embedding_contracts WHERE profile = ?1",
929            [profile],
930            |row| row.get(0),
931        )
932        .optional()
933        .map_err(|error| {
934            VectorRegenerationFailure::new(
935                VectorRegenerationFailureClass::InvalidContract,
936                error.to_string(),
937            )
938        })?;
939    if let Some(version) = version
940        && version > CURRENT_VECTOR_CONTRACT_FORMAT_VERSION
941    {
942        return Err(VectorRegenerationFailure::new(
943            VectorRegenerationFailureClass::InvalidContract,
944            format!(
945                "persisted contract format version {version} is unsupported; supported version is {CURRENT_VECTOR_CONTRACT_FORMAT_VERSION}"
946            ),
947        ));
948    }
949    Ok(())
950}
951
952fn serialize_audit_metadata(
953    metadata: &VectorRegenerationAuditMetadata,
954) -> Result<String, EngineError> {
955    let json =
956        serde_json::to_string(metadata).map_err(|error| EngineError::Bridge(error.to_string()))?;
957    if json.len() > MAX_AUDIT_METADATA_BYTES {
958        return Err(VectorRegenerationFailure::new(
959            VectorRegenerationFailureClass::InvalidContract,
960            format!("audit metadata exceeds {MAX_AUDIT_METADATA_BYTES} bytes"),
961        )
962        .to_engine_error());
963    }
964    Ok(json)
965}
966
967pub(super) fn build_regeneration_input(
968    config: &VectorRegenerationConfig,
969    identity: &QueryEmbedderIdentity,
970    chunks: Vec<VectorRegenerationInputChunk>,
971) -> VectorRegenerationInput {
972    VectorRegenerationInput {
973        profile: config.profile.clone(),
974        table_name: fathomdb_schema::vec_kind_table_name(&config.kind),
975        model_identity: identity.model_identity.clone(),
976        model_version: identity.model_version.clone(),
977        dimension: identity.dimension,
978        normalization_policy: identity.normalization_policy.clone(),
979        chunking_policy: config.chunking_policy.clone(),
980        preprocessing_policy: config.preprocessing_policy.clone(),
981        chunks,
982    }
983}
984
985pub(super) fn compute_snapshot_hash(
986    payload: &VectorRegenerationInput,
987) -> Result<String, EngineError> {
988    let bytes =
989        serde_json::to_vec(payload).map_err(|error| EngineError::Bridge(error.to_string()))?;
990    let mut hasher = sha2::Sha256::new();
991    hasher.update(bytes);
992    Ok(format!("{:x}", hasher.finalize()))
993}
994
995pub(super) fn collect_regeneration_chunks(
996    conn: &rusqlite::Connection,
997) -> Result<Vec<VectorRegenerationInputChunk>, EngineError> {
998    let mut stmt = conn.prepare(
999        r"
1000        SELECT c.id, c.node_logical_id, n.kind, c.text_content, c.byte_start, c.byte_end, n.source_ref, c.created_at
1001        FROM chunks c
1002        JOIN nodes n
1003          ON n.logical_id = c.node_logical_id
1004         AND n.superseded_at IS NULL
1005        ORDER BY c.created_at, c.id
1006        ",
1007    )?;
1008    let chunks = stmt
1009        .query_map([], |row| {
1010            Ok(VectorRegenerationInputChunk {
1011                chunk_id: row.get(0)?,
1012                node_logical_id: row.get(1)?,
1013                kind: row.get(2)?,
1014                text_content: row.get(3)?,
1015                byte_start: row.get(4)?,
1016                byte_end: row.get(5)?,
1017                source_ref: row.get(6)?,
1018                created_at: row.get(7)?,
1019            })
1020        })?
1021        .collect::<Result<Vec<_>, _>>()?;
1022    Ok(chunks)
1023}