Skip to main content

fathomdb_engine/admin/
vector.rs

1use std::path::Path;
2
3use fathomdb_schema::SchemaError;
4use rusqlite::{OptionalExtension, TransactionBehavior};
5use serde::{Deserialize, Serialize};
6use sha2::Digest;
7
8use super::{
9    AdminService, CURRENT_VECTOR_CONTRACT_FORMAT_VERSION, EngineError, MAX_AUDIT_METADATA_BYTES,
10    MAX_CONTRACT_JSON_BYTES, MAX_POLICY_LEN, MAX_PROFILE_LEN, ProjectionRepairReport,
11    ProjectionTarget, VecProfile, VectorRegenerationConfig, VectorRegenerationReport,
12};
13use crate::embedder::{BatchEmbedder, QueryEmbedder, QueryEmbedderIdentity};
14use crate::ids::new_id;
15
16#[allow(dead_code)]
17#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
18pub(super) struct VectorEmbeddingContractRecord {
19    profile: String,
20    table_name: String,
21    model_identity: String,
22    model_version: String,
23    dimension: usize,
24    normalization_policy: String,
25    chunking_policy: String,
26    preprocessing_policy: String,
27    generator_command_json: String,
28    applied_at: i64,
29    snapshot_hash: String,
30    contract_format_version: i64,
31}
32
33#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
34pub(super) struct VectorRegenerationInputChunk {
35    pub(super) chunk_id: String,
36    pub(super) node_logical_id: String,
37    pub(super) kind: String,
38    pub(super) text_content: String,
39    pub(super) byte_start: Option<i64>,
40    pub(super) byte_end: Option<i64>,
41    pub(super) source_ref: Option<String>,
42    pub(super) created_at: i64,
43}
44
45#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
46pub(super) struct VectorRegenerationInput {
47    pub(super) profile: String,
48    pub(super) table_name: String,
49    pub(super) model_identity: String,
50    pub(super) model_version: String,
51    pub(super) dimension: usize,
52    pub(super) normalization_policy: String,
53    pub(super) chunking_policy: String,
54    pub(super) preprocessing_policy: String,
55    pub(super) chunks: Vec<VectorRegenerationInputChunk>,
56}
57
58#[derive(Clone, Copy, Debug, PartialEq, Eq)]
59pub(crate) enum VectorRegenerationFailureClass {
60    InvalidContract,
61    EmbedderFailure,
62    InvalidEmbedderOutput,
63    SnapshotDrift,
64    UnsupportedVecCapability,
65}
66
67impl VectorRegenerationFailureClass {
68    fn label(self) -> &'static str {
69        match self {
70            Self::InvalidContract => "invalid contract",
71            Self::EmbedderFailure => "embedder failure",
72            Self::InvalidEmbedderOutput => "invalid embedder output",
73            Self::SnapshotDrift => "snapshot drift",
74            Self::UnsupportedVecCapability => "unsupported vec capability",
75        }
76    }
77
78    fn retryable(self) -> bool {
79        matches!(self, Self::SnapshotDrift)
80    }
81}
82
83#[derive(Clone, Debug, PartialEq, Eq)]
84pub(crate) struct VectorRegenerationFailure {
85    class: VectorRegenerationFailureClass,
86    detail: String,
87}
88
89impl VectorRegenerationFailure {
90    pub(crate) fn new(class: VectorRegenerationFailureClass, detail: impl Into<String>) -> Self {
91        Self {
92            class,
93            detail: detail.into(),
94        }
95    }
96
97    pub(super) fn to_engine_error(&self) -> EngineError {
98        let retry_suffix = if self.class.retryable() {
99            " [retryable]"
100        } else {
101            ""
102        };
103        EngineError::Bridge(format!(
104            "vector regeneration {}: {}{}",
105            self.class.label(),
106            self.detail,
107            retry_suffix
108        ))
109    }
110
111    pub(super) fn failure_class_label(&self) -> &'static str {
112        self.class.label()
113    }
114}
115
116#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
117pub(super) struct VectorRegenerationAuditMetadata {
118    pub(super) profile: String,
119    pub(super) model_identity: String,
120    pub(super) model_version: String,
121    pub(super) chunk_count: usize,
122    pub(super) snapshot_hash: String,
123    #[serde(skip_serializing_if = "Option::is_none")]
124    pub(super) failure_class: Option<String>,
125}
126
127/// Source of content for a managed per-kind vector index.
128#[non_exhaustive]
129#[derive(Clone, Copy, Debug, PartialEq, Eq)]
130pub enum VectorSource {
131    /// Use existing `chunks` rows belonging to nodes of the configured kind.
132    Chunks,
133}
134
135/// Outcome of [`AdminService::configure_vec_kind`].
136#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
137pub struct ConfigureVecOutcome {
138    /// Node kind that was configured.
139    pub kind: String,
140    /// Number of backfill rows newly enqueued in `vector_projection_work`.
141    pub enqueued_backfill_rows: usize,
142    /// True if this kind already had an enabled vector index schema row
143    /// before the call.
144    pub was_already_enabled: bool,
145}
146
147/// Managed-projection status snapshot for a given node `kind`.
148///
149/// Returned by [`AdminService::get_vec_index_status`].
150#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
151pub struct VecIndexStatus {
152    /// Node kind queried.
153    pub kind: String,
154    /// True if `vector_index_schemas.enabled = 1` for this kind.
155    pub enabled: bool,
156    /// Lifecycle state stored in `vector_index_schemas.state`, or
157    /// `"unconfigured"` when there is no schema row for this kind.
158    pub state: String,
159    /// Pending work rows with `priority >= 1000` (incremental writes).
160    pub pending_incremental: u64,
161    /// Pending work rows with `priority < 1000` (backfill).
162    pub pending_backfill: u64,
163    /// Last recorded error, if any.
164    pub last_error: Option<String>,
165    /// Unix timestamp when the kind last completed rebuild, if any.
166    pub last_completed_at: Option<i64>,
167    /// `model_identity` of the currently-active embedding profile, if any.
168    pub embedding_identity: Option<String>,
169}
170
171impl AdminService {
172    /// Retrieve the vector embedding profile for a specific node `kind`.
173    ///
174    /// Reads from `projection_profiles` under `(kind=<kind>, facet='vec')`.
175    /// Returns `None` if no vector profile has been persisted for this kind yet.
176    ///
177    /// # Errors
178    /// Returns [`EngineError`] if the database query fails.
179    pub fn get_vec_profile(&self, kind: &str) -> Result<Option<VecProfile>, EngineError> {
180        let conn = self.connect()?;
181        let result = conn
182            .query_row(
183                "SELECT \
184                   json_extract(config_json, '$.model_identity'), \
185                   json_extract(config_json, '$.model_version'), \
186                   CAST(json_extract(config_json, '$.dimensions') AS INTEGER), \
187                   active_at, \
188                   created_at \
189                 FROM projection_profiles WHERE kind = ?1 AND facet = 'vec'",
190                rusqlite::params![kind],
191                |row| {
192                    Ok(VecProfile {
193                        model_identity: row.get::<_, Option<String>>(0)?.unwrap_or_default(),
194                        model_version: row.get(1)?,
195                        dimensions: {
196                            let d: i64 = row.get::<_, Option<i64>>(2)?.unwrap_or(0);
197                            u32::try_from(d).unwrap_or(0)
198                        },
199                        active_at: row.get(3)?,
200                        created_at: row.get(4)?,
201                    })
202                },
203            )
204            .optional()?;
205        Ok(result)
206    }
207
208    /// Write or update the global vector profile from a JSON identity string.
209    ///
210    /// This is a private helper called after a successful vector regeneration.
211    /// Errors are logged as warnings and not propagated to the caller.
212    #[allow(dead_code)]
213    fn set_vec_profile_inner(
214        conn: &rusqlite::Connection,
215        identity_json: &str,
216    ) -> Result<VecProfile, rusqlite::Error> {
217        conn.execute(
218            r"INSERT INTO projection_profiles (kind, facet, config_json, active_at, created_at)
219              VALUES ('*', 'vec', ?1, unixepoch(), unixepoch())
220              ON CONFLICT(kind, facet) DO UPDATE SET
221                  config_json = ?1,
222                  active_at   = unixepoch()",
223            rusqlite::params![identity_json],
224        )?;
225        conn.query_row(
226            "SELECT \
227               json_extract(config_json, '$.model_identity'), \
228               json_extract(config_json, '$.model_version'), \
229               CAST(json_extract(config_json, '$.dimensions') AS INTEGER), \
230               active_at, \
231               created_at \
232             FROM projection_profiles WHERE kind = '*' AND facet = 'vec'",
233            [],
234            |row| {
235                Ok(VecProfile {
236                    model_identity: row.get(0)?,
237                    model_version: row.get(1)?,
238                    dimensions: {
239                        let d: i64 = row.get(2)?;
240                        u32::try_from(d).unwrap_or(0)
241                    },
242                    active_at: row.get(3)?,
243                    created_at: row.get(4)?,
244                })
245            },
246        )
247    }
248
249    /// Persist or update the global vector profile from a JSON config string.
250    ///
251    /// `config_json` must be valid JSON with at least a `model_identity`
252    /// field and `dimensions`.  The JSON is stored verbatim in the
253    /// `projection_profiles` table under `kind='*'`, `facet='vec'`.
254    ///
255    /// # Errors
256    /// Returns [`EngineError`] if the database write fails.
257    pub fn set_vec_profile(&self, config_json: &str) -> Result<VecProfile, EngineError> {
258        let conn = self.connect()?;
259        Self::set_vec_profile_inner(&conn, config_json).map_err(EngineError::Sqlite)
260    }
261
262    /// Estimate the cost of rebuilding a projection.
263    ///
264    /// For facet `"fts"`: counts active nodes of `kind`.
265    /// For facet `"vec"`: counts all chunks.
266    ///
267    /// # Errors
268    /// Returns [`EngineError`] for unknown facets or database errors.
269    pub fn preview_projection_impact(
270        &self,
271        kind: &str,
272        facet: &str,
273    ) -> Result<super::ProjectionImpact, EngineError> {
274        let conn = self.connect()?;
275        match facet {
276            "fts" => {
277                let rows: u64 = conn
278                    .query_row(
279                        "SELECT count(*) FROM nodes WHERE kind = ?1 AND superseded_at IS NULL",
280                        rusqlite::params![kind],
281                        |row| row.get::<_, i64>(0),
282                    )
283                    .map(i64::cast_unsigned)?;
284                let current_tokenizer = self.get_fts_profile(kind)?.map(|p| p.tokenizer);
285                Ok(super::ProjectionImpact {
286                    rows_to_rebuild: rows,
287                    estimated_seconds: rows / 5000,
288                    temp_db_size_bytes: rows * 200,
289                    current_tokenizer,
290                    target_tokenizer: None,
291                })
292            }
293            "vec" => {
294                let rows: u64 = conn
295                    .query_row("SELECT count(*) FROM chunks", [], |row| {
296                        row.get::<_, i64>(0)
297                    })
298                    .map(i64::cast_unsigned)?;
299                Ok(super::ProjectionImpact {
300                    rows_to_rebuild: rows,
301                    estimated_seconds: rows / 100,
302                    temp_db_size_bytes: rows * 1536,
303                    current_tokenizer: None,
304                    target_tokenizer: None,
305                })
306            }
307            other => Err(EngineError::Bridge(format!(
308                "unknown projection facet: {other:?}"
309            ))),
310        }
311    }
312
313    /// Recreate enabled vector profiles from persisted `vector_profiles` metadata.
314    ///
315    /// # Errors
316    /// Returns [`EngineError`] if the database connection fails, reading metadata fails,
317    /// or sqlite-vec support is unavailable while enabled profiles are present.
318    pub fn restore_vector_profiles(&self) -> Result<ProjectionRepairReport, EngineError> {
319        let conn = self.connect()?;
320        let profiles: Vec<(String, String, i64)> = {
321            let mut stmt = conn.prepare(
322                "SELECT profile, table_name, dimension \
323                 FROM vector_profiles WHERE enabled = 1 ORDER BY profile",
324            )?;
325            stmt.query_map([], |row| {
326                Ok((
327                    row.get::<_, String>(0)?,
328                    row.get::<_, String>(1)?,
329                    row.get::<_, i64>(2)?,
330                ))
331            })?
332            .collect::<Result<Vec<_>, _>>()?
333        };
334
335        for (profile, table_name, dimension) in &profiles {
336            let dimension = usize::try_from(*dimension).map_err(|_| {
337                EngineError::Bridge(format!("invalid vector profile dimension: {dimension}"))
338            })?;
339            self.schema_manager
340                .ensure_vector_profile(&conn, profile, table_name, dimension)?;
341        }
342
343        Ok(ProjectionRepairReport {
344            targets: vec![ProjectionTarget::Vec],
345            rebuilt_rows: profiles.len(),
346            notes: vec![],
347        })
348    }
349
350    /// Rebuild vector embeddings using an application-supplied regeneration
351    /// contract and generator command.
352    ///
353    /// The config is persisted in `vector_embedding_contracts` so the metadata
354    /// required for recovery survives future repair runs.
355    ///
356    /// Vector identity is stamped from [`QueryEmbedder::identity`] — the
357    /// caller supplies the embedder and cannot override its identity. This
358    /// makes drift between the read-path and write-path identity stories
359    /// structurally impossible.
360    ///
361    /// # Errors
362    /// Returns [`EngineError`] if the database connection fails, the config is
363    /// invalid, the embedder fails, or the regenerated embeddings are
364    /// malformed.
365    #[allow(clippy::too_many_lines)]
366    pub fn regenerate_vector_embeddings(
367        &self,
368        embedder: &dyn QueryEmbedder,
369        config: &VectorRegenerationConfig,
370    ) -> Result<VectorRegenerationReport, EngineError> {
371        let conn = self.connect()?;
372        let identity = embedder.identity();
373        let config = validate_vector_regeneration_config(&conn, config, &identity)
374            .map_err(|failure| failure.to_engine_error())?;
375        let chunks = collect_regeneration_chunks(&conn)?;
376        let payload = build_regeneration_input(&config, &identity, chunks.clone());
377        let snapshot_hash = compute_snapshot_hash(&payload)?;
378        let audit_metadata = VectorRegenerationAuditMetadata {
379            profile: config.profile.clone(),
380            model_identity: identity.model_identity.clone(),
381            model_version: identity.model_version.clone(),
382            chunk_count: chunks.len(),
383            snapshot_hash: snapshot_hash.clone(),
384            failure_class: None,
385        };
386        persist_vector_regeneration_event(
387            &conn,
388            "vector_regeneration_requested",
389            &config.profile,
390            &audit_metadata,
391        )?;
392        let notes = vec!["vector embeddings regenerated via configured embedder".to_owned()];
393
394        let mut embedding_map: std::collections::HashMap<String, Vec<u8>> =
395            std::collections::HashMap::with_capacity(chunks.len());
396        for chunk in &chunks {
397            let vector = match embedder.embed_query(&chunk.text_content) {
398                Ok(vector) => vector,
399                Err(error) => {
400                    let failure = VectorRegenerationFailure::new(
401                        VectorRegenerationFailureClass::EmbedderFailure,
402                        format!("embedder failed for chunk '{}': {error}", chunk.chunk_id),
403                    );
404                    self.persist_vector_regeneration_failure_best_effort(
405                        &config.profile,
406                        &audit_metadata,
407                        &failure,
408                    );
409                    return Err(failure.to_engine_error());
410                }
411            };
412            if vector.len() != identity.dimension {
413                let failure = VectorRegenerationFailure::new(
414                    VectorRegenerationFailureClass::InvalidEmbedderOutput,
415                    format!(
416                        "embedder produced {} values for chunk '{}', expected {}",
417                        vector.len(),
418                        chunk.chunk_id,
419                        identity.dimension
420                    ),
421                );
422                self.persist_vector_regeneration_failure_best_effort(
423                    &config.profile,
424                    &audit_metadata,
425                    &failure,
426                );
427                return Err(failure.to_engine_error());
428            }
429            if vector.iter().any(|value| !value.is_finite()) {
430                let failure = VectorRegenerationFailure::new(
431                    VectorRegenerationFailureClass::InvalidEmbedderOutput,
432                    format!(
433                        "embedder returned non-finite values for chunk '{}'",
434                        chunk.chunk_id
435                    ),
436                );
437                self.persist_vector_regeneration_failure_best_effort(
438                    &config.profile,
439                    &audit_metadata,
440                    &failure,
441                );
442                return Err(failure.to_engine_error());
443            }
444            let bytes: Vec<u8> = vector
445                .iter()
446                .flat_map(|value| value.to_le_bytes())
447                .collect();
448            embedding_map.insert(chunk.chunk_id.clone(), bytes);
449        }
450
451        let table_name = fathomdb_schema::vec_kind_table_name(&config.kind);
452        let mut conn = conn;
453        let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
454        match self
455            .schema_manager
456            .ensure_vec_kind_profile(&tx, &config.kind, identity.dimension)
457        {
458            Ok(()) => {}
459            Err(SchemaError::MissingCapability(message)) => {
460                let failure = VectorRegenerationFailure::new(
461                    VectorRegenerationFailureClass::UnsupportedVecCapability,
462                    message,
463                );
464                drop(tx);
465                self.persist_vector_regeneration_failure_best_effort(
466                    &config.profile,
467                    &audit_metadata,
468                    &failure,
469                );
470                return Err(failure.to_engine_error());
471            }
472            Err(error) => return Err(EngineError::Schema(error)),
473        }
474        let apply_chunks = collect_regeneration_chunks(&tx)?;
475        let apply_payload = build_regeneration_input(&config, &identity, apply_chunks.clone());
476        let apply_hash = compute_snapshot_hash(&apply_payload)?;
477        if apply_hash != snapshot_hash {
478            let failure = VectorRegenerationFailure::new(
479                VectorRegenerationFailureClass::SnapshotDrift,
480                "chunk snapshot changed during generation; retry".to_owned(),
481            );
482            drop(tx);
483            self.persist_vector_regeneration_failure_best_effort(
484                &config.profile,
485                &audit_metadata,
486                &failure,
487            );
488            return Err(failure.to_engine_error());
489        }
490        persist_vector_contract(&tx, &config, &table_name, &identity, &snapshot_hash)?;
491        tx.execute(&format!("DELETE FROM {table_name}"), [])?;
492        let mut stmt = tx.prepare_cached(&format!(
493            "INSERT INTO {table_name} (chunk_id, embedding) VALUES (?1, ?2)"
494        ))?;
495        let mut regenerated_rows = 0usize;
496        for chunk in &apply_chunks {
497            let Some(embedding) = embedding_map.remove(&chunk.chunk_id) else {
498                drop(stmt);
499                drop(tx);
500                let failure = VectorRegenerationFailure::new(
501                    VectorRegenerationFailureClass::InvalidEmbedderOutput,
502                    format!(
503                        "embedder did not produce a vector for chunk '{}'",
504                        chunk.chunk_id
505                    ),
506                );
507                self.persist_vector_regeneration_failure_best_effort(
508                    &config.profile,
509                    &audit_metadata,
510                    &failure,
511                );
512                return Err(failure.to_engine_error());
513            };
514            stmt.execute(rusqlite::params![chunk.chunk_id.as_str(), embedding])?;
515            regenerated_rows += 1;
516        }
517        drop(stmt);
518        persist_vector_regeneration_event(
519            &tx,
520            "vector_regeneration_apply",
521            &config.profile,
522            &audit_metadata,
523        )?;
524        tx.commit()?;
525
526        Ok(VectorRegenerationReport {
527            profile: config.profile.clone(),
528            table_name,
529            dimension: identity.dimension,
530            total_chunks: chunks.len(),
531            regenerated_rows,
532            contract_persisted: true,
533            notes,
534        })
535    }
536
537    /// Regenerate vector embeddings in-process using a [`BatchEmbedder`].
538    ///
539    /// Functionally equivalent to [`regenerate_vector_embeddings`] but uses
540    /// `BatchEmbedder::batch_embed` to process all chunks in one call. This
541    /// is the intended path for [`BuiltinBgeSmallEmbedder`] — it keeps the
542    /// forward pass in-process without requiring an external subprocess.
543    ///
544    /// The subprocess-based path ([`regenerate_vector_embeddings`]) remains
545    /// intact for callers who supply their own generator binary.
546    ///
547    /// # Errors
548    /// Returns [`EngineError`] if the database connection fails, the config is
549    /// invalid, the embedder fails, or the regenerated embeddings are malformed.
550    #[allow(clippy::too_many_lines)]
551    pub fn regenerate_vector_embeddings_in_process(
552        &self,
553        embedder: &dyn BatchEmbedder,
554        config: &VectorRegenerationConfig,
555    ) -> Result<VectorRegenerationReport, EngineError> {
556        let conn = self.connect()?;
557        let identity = embedder.identity();
558        let config = validate_vector_regeneration_config(&conn, config, &identity)
559            .map_err(|failure| failure.to_engine_error())?;
560        let chunks = collect_regeneration_chunks(&conn)?;
561        let payload = build_regeneration_input(&config, &identity, chunks.clone());
562        let snapshot_hash = compute_snapshot_hash(&payload)?;
563        let audit_metadata = VectorRegenerationAuditMetadata {
564            profile: config.profile.clone(),
565            model_identity: identity.model_identity.clone(),
566            model_version: identity.model_version.clone(),
567            chunk_count: chunks.len(),
568            snapshot_hash: snapshot_hash.clone(),
569            failure_class: None,
570        };
571        persist_vector_regeneration_event(
572            &conn,
573            "vector_regeneration_requested",
574            &config.profile,
575            &audit_metadata,
576        )?;
577        let notes = vec!["vector embeddings regenerated via in-process batch embedder".to_owned()];
578
579        // Collect texts and call batch_embed once for all chunks.
580        let chunk_texts: Vec<String> = chunks.iter().map(|c| c.text_content.clone()).collect();
581        let batch_vectors = match embedder.batch_embed(&chunk_texts) {
582            Ok(vecs) => vecs,
583            Err(error) => {
584                let failure = VectorRegenerationFailure::new(
585                    VectorRegenerationFailureClass::EmbedderFailure,
586                    format!("batch embedder failed: {error}"),
587                );
588                self.persist_vector_regeneration_failure_best_effort(
589                    &config.profile,
590                    &audit_metadata,
591                    &failure,
592                );
593                return Err(failure.to_engine_error());
594            }
595        };
596        if batch_vectors.len() != chunks.len() {
597            let failure = VectorRegenerationFailure::new(
598                VectorRegenerationFailureClass::InvalidEmbedderOutput,
599                format!(
600                    "batch embedder returned {} vectors for {} chunks",
601                    batch_vectors.len(),
602                    chunks.len()
603                ),
604            );
605            self.persist_vector_regeneration_failure_best_effort(
606                &config.profile,
607                &audit_metadata,
608                &failure,
609            );
610            return Err(failure.to_engine_error());
611        }
612
613        let mut embedding_map: std::collections::HashMap<String, Vec<u8>> =
614            std::collections::HashMap::with_capacity(chunks.len());
615        for (chunk, vector) in chunks.iter().zip(batch_vectors) {
616            if vector.len() != identity.dimension {
617                let failure = VectorRegenerationFailure::new(
618                    VectorRegenerationFailureClass::InvalidEmbedderOutput,
619                    format!(
620                        "embedder produced {} values for chunk '{}', expected {}",
621                        vector.len(),
622                        chunk.chunk_id,
623                        identity.dimension
624                    ),
625                );
626                self.persist_vector_regeneration_failure_best_effort(
627                    &config.profile,
628                    &audit_metadata,
629                    &failure,
630                );
631                return Err(failure.to_engine_error());
632            }
633            if vector.iter().any(|value| !value.is_finite()) {
634                let failure = VectorRegenerationFailure::new(
635                    VectorRegenerationFailureClass::InvalidEmbedderOutput,
636                    format!(
637                        "embedder returned non-finite values for chunk '{}'",
638                        chunk.chunk_id
639                    ),
640                );
641                self.persist_vector_regeneration_failure_best_effort(
642                    &config.profile,
643                    &audit_metadata,
644                    &failure,
645                );
646                return Err(failure.to_engine_error());
647            }
648            let bytes: Vec<u8> = vector
649                .iter()
650                .flat_map(|value| value.to_le_bytes())
651                .collect();
652            embedding_map.insert(chunk.chunk_id.clone(), bytes);
653        }
654
655        let mut conn = conn;
656        let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
657        let table_name = fathomdb_schema::vec_kind_table_name(&config.kind);
658        match self
659            .schema_manager
660            .ensure_vec_kind_profile(&tx, &config.kind, identity.dimension)
661        {
662            Ok(()) => {}
663            Err(SchemaError::MissingCapability(message)) => {
664                let failure = VectorRegenerationFailure::new(
665                    VectorRegenerationFailureClass::UnsupportedVecCapability,
666                    message,
667                );
668                drop(tx);
669                self.persist_vector_regeneration_failure_best_effort(
670                    &config.profile,
671                    &audit_metadata,
672                    &failure,
673                );
674                return Err(failure.to_engine_error());
675            }
676            Err(error) => return Err(EngineError::Schema(error)),
677        }
678        let apply_chunks = collect_regeneration_chunks(&tx)?;
679        let apply_payload = build_regeneration_input(&config, &identity, apply_chunks.clone());
680        let apply_hash = compute_snapshot_hash(&apply_payload)?;
681        if apply_hash != snapshot_hash {
682            let failure = VectorRegenerationFailure::new(
683                VectorRegenerationFailureClass::SnapshotDrift,
684                "chunk snapshot changed during generation; retry".to_owned(),
685            );
686            drop(tx);
687            self.persist_vector_regeneration_failure_best_effort(
688                &config.profile,
689                &audit_metadata,
690                &failure,
691            );
692            return Err(failure.to_engine_error());
693        }
694        persist_vector_contract(&tx, &config, &table_name, &identity, &snapshot_hash)?;
695        tx.execute(&format!("DELETE FROM {table_name}"), [])?;
696        let mut stmt = tx.prepare_cached(&format!(
697            "INSERT INTO {table_name} (chunk_id, embedding) VALUES (?1, ?2)"
698        ))?;
699        let mut regenerated_rows = 0usize;
700        for chunk in &apply_chunks {
701            let Some(embedding) = embedding_map.remove(&chunk.chunk_id) else {
702                drop(stmt);
703                drop(tx);
704                let failure = VectorRegenerationFailure::new(
705                    VectorRegenerationFailureClass::InvalidEmbedderOutput,
706                    format!(
707                        "embedder did not produce a vector for chunk '{}'",
708                        chunk.chunk_id
709                    ),
710                );
711                self.persist_vector_regeneration_failure_best_effort(
712                    &config.profile,
713                    &audit_metadata,
714                    &failure,
715                );
716                return Err(failure.to_engine_error());
717            };
718            stmt.execute(rusqlite::params![chunk.chunk_id.as_str(), embedding])?;
719            regenerated_rows += 1;
720        }
721        drop(stmt);
722        persist_vector_regeneration_event(
723            &tx,
724            "vector_regeneration_apply",
725            &config.profile,
726            &audit_metadata,
727        )?;
728        tx.commit()?;
729
730        Ok(VectorRegenerationReport {
731            profile: config.profile.clone(),
732            table_name,
733            dimension: identity.dimension,
734            total_chunks: chunks.len(),
735            regenerated_rows,
736            contract_persisted: true,
737            notes,
738        })
739    }
740
741    pub(super) fn persist_vector_regeneration_failure_best_effort(
742        &self,
743        profile: &str,
744        metadata: &VectorRegenerationAuditMetadata,
745        failure: &VectorRegenerationFailure,
746    ) {
747        let Ok(conn) = self.connect() else {
748            return;
749        };
750        let failure_metadata = VectorRegenerationAuditMetadata {
751            profile: metadata.profile.clone(),
752            model_identity: metadata.model_identity.clone(),
753            model_version: metadata.model_version.clone(),
754            chunk_count: metadata.chunk_count,
755            snapshot_hash: metadata.snapshot_hash.clone(),
756            failure_class: Some(failure.failure_class_label().to_owned()),
757        };
758        let _ = persist_vector_regeneration_event(
759            &conn,
760            "vector_regeneration_failed",
761            profile,
762            &failure_metadata,
763        );
764    }
765
766    /// Configure per-kind vector indexing for `kind`, sourced from `source`.
767    ///
768    /// Requires at least one active row in `vector_embedding_profiles`. On
769    /// first call, creates the `vec_<kind>` sqlite-vec table, inserts a
770    /// `vector_index_schemas` row, and enqueues backfill rows in
771    /// `vector_projection_work` (one per existing chunk of that kind).
772    /// Subsequent calls are idempotent: no duplicate pending work rows are
773    /// created for the (`chunk_id`, `embedding_profile_id`) pair.
774    ///
775    /// # Errors
776    /// Returns [`EngineError::InvalidConfig`] if no active embedding profile
777    /// exists; [`EngineError::Sqlite`]/[`EngineError::Schema`] on storage
778    /// failures.
779    pub fn configure_vec_kind(
780        &self,
781        kind: &str,
782        source: VectorSource,
783    ) -> Result<ConfigureVecOutcome, EngineError> {
784        match source {
785            VectorSource::Chunks => {}
786        }
787        let mut conn = self.connect()?;
788
789        let profile: Option<(i64, i64)> = conn
790            .query_row(
791                "SELECT profile_id, dimensions FROM vector_embedding_profiles WHERE active = 1",
792                [],
793                |row| Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?)),
794            )
795            .optional()?;
796        let (profile_id, dimensions) = profile.ok_or_else(|| {
797            EngineError::InvalidConfig(
798                "no active embedding profile configured; call configure_embedding first".to_owned(),
799            )
800        })?;
801        let dimensions = usize::try_from(dimensions).map_err(|_| {
802            EngineError::Bridge(format!(
803                "invalid embedding profile dimensions: {dimensions}"
804            ))
805        })?;
806
807        let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
808
809        let was_already_enabled: bool = tx
810            .query_row(
811                "SELECT enabled FROM vector_index_schemas WHERE kind = ?1",
812                rusqlite::params![kind],
813                |row| row.get::<_, i64>(0).map(|v| v == 1),
814            )
815            .optional()?
816            .unwrap_or(false);
817
818        tx.execute(
819            "INSERT INTO vector_index_schemas \
820             (kind, enabled, source_mode, source_config_json, state, created_at, updated_at) \
821             VALUES (?1, 1, 'chunks', NULL, 'fresh', unixepoch(), unixepoch()) \
822             ON CONFLICT(kind) DO UPDATE SET \
823                 enabled = 1, \
824                 source_mode = 'chunks', \
825                 source_config_json = NULL, \
826                 updated_at = unixepoch()",
827            rusqlite::params![kind],
828        )?;
829
830        self.schema_manager
831            .ensure_vec_kind_profile(&tx, kind, dimensions)?;
832
833        let chunks = collect_kind_chunks(&tx, kind)?;
834        let mut enqueued: usize = 0;
835        {
836            let mut stmt = tx.prepare(
837                "INSERT INTO vector_projection_work \
838                 (kind, node_logical_id, chunk_id, canonical_hash, priority, \
839                  embedding_profile_id, state, created_at, updated_at) \
840                 SELECT ?1, ?2, ?3, ?4, 0, ?5, 'pending', unixepoch(), unixepoch() \
841                 WHERE NOT EXISTS ( \
842                     SELECT 1 FROM vector_projection_work \
843                     WHERE chunk_id = ?3 AND embedding_profile_id = ?5 AND state = 'pending' \
844                 )",
845            )?;
846            for chunk in &chunks {
847                let canonical_hash = canonical_chunk_hash(&chunk.chunk_id, &chunk.text_content);
848                let inserted = stmt.execute(rusqlite::params![
849                    kind,
850                    chunk.node_logical_id.as_str(),
851                    chunk.chunk_id.as_str(),
852                    canonical_hash,
853                    profile_id,
854                ])?;
855                enqueued += inserted;
856            }
857        }
858
859        tx.commit()?;
860
861        Ok(ConfigureVecOutcome {
862            kind: kind.to_owned(),
863            enqueued_backfill_rows: enqueued,
864            was_already_enabled,
865        })
866    }
867
868    /// Batch form of [`Self::configure_vec_kind`]. Loops over each
869    /// `(kind, source)` in input order and returns one outcome per entry.
870    ///
871    /// Per-kind atomicity matches [`Self::configure_vec_kind`]: each call
872    /// runs its own transaction. The batch as a whole is **not**
873    /// atomic — if the third call fails, the first two remain committed.
874    ///
875    /// # Errors
876    /// Returns the first [`EngineError`] encountered; already-committed
877    /// entries remain committed.
878    pub fn configure_vec_kinds(
879        &self,
880        items: &[(String, VectorSource)],
881    ) -> Result<Vec<ConfigureVecOutcome>, EngineError> {
882        let mut outcomes = Vec::with_capacity(items.len());
883        for (kind, source) in items {
884            outcomes.push(self.configure_vec_kind(kind, *source)?);
885        }
886        Ok(outcomes)
887    }
888
889    /// Return the managed vector indexing status for `kind`.
890    ///
891    /// If no `vector_index_schemas` row exists for `kind`, returns
892    /// `enabled = false` and `state = "unconfigured"` with zero counts.
893    ///
894    /// # Errors
895    /// Returns [`EngineError`] on database failures.
896    pub fn get_vec_index_status(&self, kind: &str) -> Result<VecIndexStatus, EngineError> {
897        let conn = self.connect()?;
898
899        let schema_row: Option<(bool, String, Option<String>, Option<i64>)> = conn
900            .query_row(
901                "SELECT enabled, state, last_error, last_completed_at \
902                 FROM vector_index_schemas WHERE kind = ?1",
903                rusqlite::params![kind],
904                |row| {
905                    Ok((
906                        row.get::<_, i64>(0)? == 1,
907                        row.get::<_, String>(1)?,
908                        row.get::<_, Option<String>>(2)?,
909                        row.get::<_, Option<i64>>(3)?,
910                    ))
911                },
912            )
913            .optional()?;
914
915        let Some((enabled, state, last_error, last_completed_at)) = schema_row else {
916            return Ok(VecIndexStatus {
917                kind: kind.to_owned(),
918                enabled: false,
919                state: "unconfigured".to_owned(),
920                pending_incremental: 0,
921                pending_backfill: 0,
922                last_error: None,
923                last_completed_at: None,
924                embedding_identity: None,
925            });
926        };
927
928        let pending_backfill: u64 = conn
929            .query_row(
930                "SELECT count(*) FROM vector_projection_work \
931                 WHERE kind = ?1 AND state = 'pending' AND priority < 1000",
932                rusqlite::params![kind],
933                |row| row.get::<_, i64>(0),
934            )
935            .map(i64::cast_unsigned)?;
936
937        let pending_incremental: u64 = conn
938            .query_row(
939                "SELECT count(*) FROM vector_projection_work \
940                 WHERE kind = ?1 AND state = 'pending' AND priority >= 1000",
941                rusqlite::params![kind],
942                |row| row.get::<_, i64>(0),
943            )
944            .map(i64::cast_unsigned)?;
945
946        let embedding_identity: Option<String> = conn
947            .query_row(
948                "SELECT model_identity FROM vector_embedding_profiles WHERE active = 1",
949                [],
950                |row| row.get::<_, String>(0),
951            )
952            .optional()?;
953
954        Ok(VecIndexStatus {
955            kind: kind.to_owned(),
956            enabled,
957            state,
958            pending_incremental,
959            pending_backfill,
960            last_error,
961            last_completed_at,
962            embedding_identity,
963        })
964    }
965
966    /// # Errors
967    /// Returns [`EngineError`] if the database connection fails or the projection rebuild fails.
968    pub fn rebuild_projections(
969        &self,
970        target: ProjectionTarget,
971    ) -> Result<ProjectionRepairReport, EngineError> {
972        self.projections.rebuild_projections(target)
973    }
974
975    /// # Errors
976    /// Returns [`EngineError`] if the database connection fails or the projection rebuild fails.
977    pub fn rebuild_missing_projections(&self) -> Result<ProjectionRepairReport, EngineError> {
978        self.projections.rebuild_missing_projections()
979    }
980
981    /// Activate, replace, or confirm the database-wide embedding identity.
982    ///
983    /// Vector identity belongs to the embedder: the `model_identity`,
984    /// `model_version`, `dimensions`, and `normalization_policy` persisted in
985    /// `vector_embedding_profiles` are read directly from
986    /// `embedder.identity()`. Callers cannot supply an identity string.
987    ///
988    /// Semantics:
989    /// - If no active profile exists: insert a new active row.
990    ///   Returns [`ConfigureEmbeddingOutcome::Activated`].
991    /// - If an active profile exists and the identity matches exactly: no-op.
992    ///   Returns [`ConfigureEmbeddingOutcome::Unchanged`].
993    /// - If an active profile exists and the identity differs:
994    ///   * If any `vector_index_schemas.enabled = 1` rows exist and
995    ///     `acknowledge_rebuild_impact = false`: return
996    ///     [`EngineError::EmbeddingChangeRequiresAck`] without mutating state.
997    ///   * Otherwise, within a single transaction: demote the current active
998    ///     profile, insert the new active profile, and mark every enabled
999    ///     vector index schema `state = 'stale'`. Returns
1000    ///     [`ConfigureEmbeddingOutcome::Replaced`].
1001    ///
1002    /// This method never triggers a rebuild itself. Affected kinds are marked
1003    /// `stale` so later rebuild flows can pick them up.
1004    ///
1005    /// # Errors
1006    /// - [`EngineError::EmbeddingChangeRequiresAck`] if the identity change
1007    ///   would invalidate enabled vector index kinds and the caller did not
1008    ///   acknowledge the rebuild impact.
1009    /// - [`EngineError::Sqlite`] if any underlying SQL fails.
1010    #[allow(clippy::too_many_lines)]
1011    pub fn configure_embedding(
1012        &self,
1013        embedder: &dyn QueryEmbedder,
1014        acknowledge_rebuild_impact: bool,
1015    ) -> Result<ConfigureEmbeddingOutcome, EngineError> {
1016        let identity = embedder.identity();
1017        let max_tokens = embedder.max_tokens();
1018        let dimensions = i64::try_from(identity.dimension).map_err(|_| {
1019            EngineError::InvalidConfig(format!(
1020                "embedder dimension {} exceeds i64 range",
1021                identity.dimension
1022            ))
1023        })?;
1024        let max_tokens_i64 = i64::try_from(max_tokens).ok();
1025
1026        let mut conn = self.connect()?;
1027        let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
1028
1029        // Look up the current active profile, if any.
1030        let current: Option<(i64, String, String, i64, String)> = tx
1031            .query_row(
1032                "SELECT profile_id, model_identity, COALESCE(model_version, ''), dimensions, \
1033                        COALESCE(normalization_policy, '') \
1034                 FROM vector_embedding_profiles WHERE active = 1",
1035                [],
1036                |row| {
1037                    Ok((
1038                        row.get::<_, i64>(0)?,
1039                        row.get::<_, String>(1)?,
1040                        row.get::<_, String>(2)?,
1041                        row.get::<_, i64>(3)?,
1042                        row.get::<_, String>(4)?,
1043                    ))
1044                },
1045            )
1046            .optional()?;
1047
1048        let incoming_version = identity.model_version.clone();
1049        if let Some((profile_id, current_identity, current_version, current_dim, current_norm)) =
1050            current.clone()
1051        {
1052            if current_identity == identity.model_identity
1053                && current_version == incoming_version
1054                && current_dim == dimensions
1055                && current_norm == identity.normalization_policy
1056            {
1057                // Identical — no-op.
1058                tx.commit()?;
1059                return Ok(ConfigureEmbeddingOutcome::Unchanged { profile_id });
1060            }
1061
1062            // Identity differs: count enabled kinds.
1063            let affected_kinds: i64 = tx.query_row(
1064                "SELECT COUNT(*) FROM vector_index_schemas WHERE enabled = 1",
1065                [],
1066                |row| row.get::<_, i64>(0),
1067            )?;
1068            let affected = usize::try_from(affected_kinds).unwrap_or(0);
1069            if affected > 0 && !acknowledge_rebuild_impact {
1070                // No mutation — drop the transaction.
1071                drop(tx);
1072                return Err(EngineError::EmbeddingChangeRequiresAck {
1073                    affected_kinds: affected,
1074                });
1075            }
1076
1077            let identity_triple_changed = current_identity != identity.model_identity
1078                || current_version != incoming_version
1079                || current_dim != dimensions;
1080
1081            let new_profile_id = if identity_triple_changed {
1082                // Demote current active row.
1083                tx.execute(
1084                    "UPDATE vector_embedding_profiles SET active = 0 WHERE active = 1",
1085                    [],
1086                )?;
1087
1088                // Insert new active row.
1089                insert_new_active_profile(
1090                    &tx,
1091                    &identity.model_identity,
1092                    &incoming_version,
1093                    dimensions,
1094                    &identity.normalization_policy,
1095                    max_tokens_i64,
1096                )?
1097            } else {
1098                // Normalization-only change: the unique index on
1099                // (model_identity, model_version, dimensions) prevents inserting
1100                // a second row with the same triple, so update in place. The
1101                // profile row still reflects the new policy and enabled kinds
1102                // still get staled so rebuilds pick up the change.
1103                let normalization_opt: Option<&str> = if identity.normalization_policy.is_empty() {
1104                    None
1105                } else {
1106                    Some(identity.normalization_policy.as_str())
1107                };
1108                tx.execute(
1109                    "UPDATE vector_embedding_profiles \
1110                     SET normalization_policy = ?1, max_tokens = ?2 \
1111                     WHERE profile_id = ?3",
1112                    rusqlite::params![normalization_opt, max_tokens_i64, profile_id],
1113                )?;
1114                profile_id
1115            };
1116
1117            // Mark enabled kinds stale.
1118            let stale_kinds = if affected > 0 {
1119                tx.execute(
1120                    "UPDATE vector_index_schemas \
1121                     SET state = 'stale', updated_at = unixepoch() \
1122                     WHERE enabled = 1",
1123                    [],
1124                )?
1125            } else {
1126                0
1127            };
1128
1129            tx.commit()?;
1130            return Ok(ConfigureEmbeddingOutcome::Replaced {
1131                old_profile_id: profile_id,
1132                new_profile_id,
1133                stale_kinds,
1134            });
1135        }
1136
1137        // No active profile: activate a new one.
1138        let new_profile_id = insert_new_active_profile(
1139            &tx,
1140            &identity.model_identity,
1141            &incoming_version,
1142            dimensions,
1143            &identity.normalization_policy,
1144            max_tokens_i64,
1145        )?;
1146        tx.commit()?;
1147        Ok(ConfigureEmbeddingOutcome::Activated {
1148            profile_id: new_profile_id,
1149        })
1150    }
1151
1152    /// Return the active `vector_embedding_profiles.profile_id`, or `None`
1153    /// if no active profile is configured.
1154    ///
1155    /// # Errors
1156    /// Returns [`EngineError`] if the database read fails.
1157    pub fn active_embedding_profile_id(&self) -> Result<Option<i64>, EngineError> {
1158        let conn = self.connect()?;
1159        let id = conn
1160            .query_row(
1161                "SELECT profile_id FROM vector_embedding_profiles WHERE active = 1",
1162                [],
1163                |row| row.get::<_, i64>(0),
1164            )
1165            .optional()?;
1166        Ok(id)
1167    }
1168
1169    /// Run projection-worker scheduling ticks until no more work remains or
1170    /// `timeout` elapses.  Intended for tests and for admin-driven catch-up
1171    /// flows.
1172    ///
1173    /// # Errors
1174    /// Returns [`EngineError`] on writer or database failures, or
1175    /// [`EngineError::InvalidConfig`] if the admin service was constructed
1176    /// without a writer handle.
1177    pub fn drain_vector_projection(
1178        &self,
1179        embedder: &dyn BatchEmbedder,
1180        timeout: std::time::Duration,
1181    ) -> Result<crate::vector_projection_actor::DrainReport, EngineError> {
1182        let deadline = std::time::Instant::now() + timeout;
1183        let mut report = crate::vector_projection_actor::DrainReport::default();
1184        let writer = self.require_writer()?;
1185        loop {
1186            if std::time::Instant::now() >= deadline {
1187                break;
1188            }
1189            let tick = crate::vector_projection_actor::run_tick(self, &writer, embedder)?;
1190            if tick.embedder_unavailable {
1191                report.embedder_unavailable_ticks += 1;
1192                break;
1193            }
1194            report.incremental_processed += tick.processed_incremental;
1195            report.backfill_processed += tick.processed_backfill;
1196            report.failed += tick.failed;
1197            report.discarded_stale += tick.discarded_stale;
1198            if tick.idle {
1199                break;
1200            }
1201        }
1202        Ok(report)
1203    }
1204
1205    /// Run exactly one projection scheduling tick; used by tests that need
1206    /// to assert priority ordering.
1207    ///
1208    /// # Errors
1209    /// Returns [`EngineError`] on writer or database failures.
1210    pub fn drain_vector_projection_single_tick(
1211        &self,
1212        embedder: &dyn BatchEmbedder,
1213    ) -> Result<crate::vector_projection_actor::DrainReport, EngineError> {
1214        let writer = self.require_writer()?;
1215        let tick = crate::vector_projection_actor::run_tick(self, &writer, embedder)?;
1216        let mut report = crate::vector_projection_actor::DrainReport::default();
1217        if tick.embedder_unavailable {
1218            report.embedder_unavailable_ticks = 1;
1219        }
1220        report.incremental_processed = tick.processed_incremental;
1221        report.backfill_processed = tick.processed_backfill;
1222        report.failed = tick.failed;
1223        report.discarded_stale = tick.discarded_stale;
1224        Ok(report)
1225    }
1226
1227    fn require_writer(&self) -> Result<std::sync::Arc<crate::WriterActor>, EngineError> {
1228        self.writer.clone().ok_or_else(|| {
1229            EngineError::InvalidConfig(
1230                "drain_vector_projection requires an engine-wired AdminService".to_owned(),
1231            )
1232        })
1233    }
1234
1235    /// Probe the supplied embedder by attempting a fixed short embed call.
1236    ///
1237    /// Used as an availability check for the active embedder. Does not touch
1238    /// persistent state.
1239    ///
1240    /// # Errors
1241    /// Returns [`EngineError::CapabilityMissing`] wrapping the embedder
1242    /// diagnostic if the embedder is unavailable or its call fails.
1243    pub fn check_embedding(&self, embedder: &dyn QueryEmbedder) -> Result<(), EngineError> {
1244        match embedder.embed_query("fathomdb embedder health probe") {
1245            Ok(_) => Ok(()),
1246            Err(err) => Err(EngineError::CapabilityMissing(format!(
1247                "embedder probe failed: {err}"
1248            ))),
1249        }
1250    }
1251}
1252
1253/// Outcome of [`AdminService::configure_embedding`].
1254#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
1255#[serde(tag = "outcome", rename_all = "snake_case")]
1256pub enum ConfigureEmbeddingOutcome {
1257    /// No active embedding profile existed; a new one was inserted.
1258    Activated {
1259        /// Newly inserted `vector_embedding_profiles.profile_id`.
1260        profile_id: i64,
1261    },
1262    /// The requested identity matched the active profile exactly; nothing
1263    /// was changed.
1264    Unchanged {
1265        /// The existing active `vector_embedding_profiles.profile_id`.
1266        profile_id: i64,
1267    },
1268    /// The active profile was replaced and any enabled vector index
1269    /// schemas were marked `stale`.
1270    Replaced {
1271        /// The previously-active `profile_id` (now demoted).
1272        old_profile_id: i64,
1273        /// The newly-inserted active `profile_id`.
1274        new_profile_id: i64,
1275        /// Number of `vector_index_schemas` rows newly marked `stale`.
1276        stale_kinds: usize,
1277    },
1278}
1279
1280fn insert_new_active_profile(
1281    tx: &rusqlite::Transaction<'_>,
1282    model_identity: &str,
1283    model_version: &str,
1284    dimensions: i64,
1285    normalization_policy: &str,
1286    max_tokens: Option<i64>,
1287) -> Result<i64, rusqlite::Error> {
1288    // `profile_name` is NOT NULL in the schema; derive it from identity so the
1289    // row is self-describing without inventing a user-facing name surface.
1290    let profile_name = format!("{model_identity}@{model_version}");
1291    let model_version_opt: Option<&str> = if model_version.is_empty() {
1292        None
1293    } else {
1294        Some(model_version)
1295    };
1296    let normalization_opt: Option<&str> = if normalization_policy.is_empty() {
1297        None
1298    } else {
1299        Some(normalization_policy)
1300    };
1301    tx.execute(
1302        "INSERT INTO vector_embedding_profiles \
1303            (profile_name, model_identity, model_version, dimensions, normalization_policy, \
1304             max_tokens, active, activated_at, created_at) \
1305         VALUES (?1, ?2, ?3, ?4, ?5, ?6, 1, unixepoch(), unixepoch())",
1306        rusqlite::params![
1307            profile_name,
1308            model_identity,
1309            model_version_opt,
1310            dimensions,
1311            normalization_opt,
1312            max_tokens,
1313        ],
1314    )?;
1315    Ok(tx.last_insert_rowid())
1316}
1317
1318/// # Errors
1319/// Returns [`EngineError`] if the file cannot be read or the config is invalid.
1320pub fn load_vector_regeneration_config(
1321    path: impl AsRef<Path>,
1322) -> Result<VectorRegenerationConfig, EngineError> {
1323    let path = path.as_ref();
1324    let raw = std::fs::read_to_string(path)?;
1325    match path.extension().and_then(|ext| ext.to_str()) {
1326        Some("toml") => {
1327            toml::from_str(&raw).map_err(|error| EngineError::Bridge(error.to_string()))
1328        }
1329        Some("json") | None => {
1330            serde_json::from_str(&raw).map_err(|error| EngineError::Bridge(error.to_string()))
1331        }
1332        Some(other) => Err(EngineError::Bridge(format!(
1333            "unsupported vector regeneration config extension: {other}"
1334        ))),
1335    }
1336}
1337
1338fn validate_vector_regeneration_config(
1339    conn: &rusqlite::Connection,
1340    config: &VectorRegenerationConfig,
1341    identity: &QueryEmbedderIdentity,
1342) -> Result<VectorRegenerationConfig, VectorRegenerationFailure> {
1343    let kind = validate_bounded_text("kind", &config.kind, MAX_PROFILE_LEN)?;
1344    let profile = validate_bounded_text("profile", &config.profile, MAX_PROFILE_LEN)?;
1345    if identity.dimension == 0 {
1346        return Err(VectorRegenerationFailure::new(
1347            VectorRegenerationFailureClass::InvalidContract,
1348            "embedder reports dimension 0".to_owned(),
1349        ));
1350    }
1351    let chunking_policy =
1352        validate_bounded_text("chunking_policy", &config.chunking_policy, MAX_POLICY_LEN)?;
1353    let preprocessing_policy = validate_bounded_text(
1354        "preprocessing_policy",
1355        &config.preprocessing_policy,
1356        MAX_POLICY_LEN,
1357    )?;
1358
1359    if let Some(existing_dimension) = current_vector_profile_dimension(conn, &profile)?
1360        && existing_dimension != identity.dimension
1361    {
1362        return Err(VectorRegenerationFailure::new(
1363            VectorRegenerationFailureClass::InvalidContract,
1364            format!(
1365                "embedder dimension {} does not match existing vector profile dimension {}",
1366                identity.dimension, existing_dimension
1367            ),
1368        ));
1369    }
1370
1371    validate_existing_contract_version(conn, &profile)?;
1372
1373    let normalized = VectorRegenerationConfig {
1374        kind,
1375        profile,
1376        chunking_policy,
1377        preprocessing_policy,
1378    };
1379    let serialized = serde_json::to_vec(&normalized).map_err(|error| {
1380        VectorRegenerationFailure::new(
1381            VectorRegenerationFailureClass::InvalidContract,
1382            error.to_string(),
1383        )
1384    })?;
1385    if serialized.len() > MAX_CONTRACT_JSON_BYTES {
1386        return Err(VectorRegenerationFailure::new(
1387            VectorRegenerationFailureClass::InvalidContract,
1388            format!("serialized contract exceeds {MAX_CONTRACT_JSON_BYTES} bytes"),
1389        ));
1390    }
1391
1392    Ok(normalized)
1393}
1394
1395#[allow(clippy::cast_possible_wrap)]
1396fn persist_vector_contract(
1397    conn: &rusqlite::Connection,
1398    config: &VectorRegenerationConfig,
1399    table_name: &str,
1400    identity: &QueryEmbedderIdentity,
1401    snapshot_hash: &str,
1402) -> Result<(), EngineError> {
1403    conn.execute(
1404        r"
1405        INSERT OR REPLACE INTO vector_embedding_contracts (
1406            profile,
1407            table_name,
1408            model_identity,
1409            model_version,
1410            dimension,
1411            normalization_policy,
1412            chunking_policy,
1413            preprocessing_policy,
1414            generator_command_json,
1415            applied_at,
1416            snapshot_hash,
1417            contract_format_version,
1418            updated_at
1419        ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, unixepoch(), ?10, ?11, unixepoch())
1420        ",
1421        rusqlite::params![
1422            config.profile.as_str(),
1423            table_name,
1424            identity.model_identity.as_str(),
1425            identity.model_version.as_str(),
1426            identity.dimension as i64,
1427            identity.normalization_policy.as_str(),
1428            config.chunking_policy.as_str(),
1429            config.preprocessing_policy.as_str(),
1430            "[]",
1431            snapshot_hash,
1432            CURRENT_VECTOR_CONTRACT_FORMAT_VERSION,
1433        ],
1434    )?;
1435    Ok(())
1436}
1437
1438fn persist_vector_regeneration_event(
1439    conn: &rusqlite::Connection,
1440    event_type: &str,
1441    subject: &str,
1442    metadata: &VectorRegenerationAuditMetadata,
1443) -> Result<(), EngineError> {
1444    let metadata_json = serialize_audit_metadata(metadata)?;
1445    conn.execute(
1446        "INSERT INTO provenance_events (id, event_type, subject, metadata_json) VALUES (?1, ?2, ?3, ?4)",
1447        rusqlite::params![new_id(), event_type, subject, metadata_json],
1448    )?;
1449    Ok(())
1450}
1451
1452fn validate_bounded_text(
1453    field: &str,
1454    value: &str,
1455    max_len: usize,
1456) -> Result<String, VectorRegenerationFailure> {
1457    let trimmed = value.trim();
1458    if trimmed.is_empty() {
1459        return Err(VectorRegenerationFailure::new(
1460            VectorRegenerationFailureClass::InvalidContract,
1461            format!("{field} must not be empty"),
1462        ));
1463    }
1464    if trimmed.len() > max_len {
1465        return Err(VectorRegenerationFailure::new(
1466            VectorRegenerationFailureClass::InvalidContract,
1467            format!("{field} exceeds max length {max_len}"),
1468        ));
1469    }
1470    Ok(trimmed.to_owned())
1471}
1472
1473fn current_vector_profile_dimension(
1474    conn: &rusqlite::Connection,
1475    profile: &str,
1476) -> Result<Option<usize>, VectorRegenerationFailure> {
1477    let dimension: Option<i64> = conn
1478        .query_row(
1479            "SELECT dimension FROM vector_profiles WHERE profile = ?1 AND enabled = 1",
1480            [profile],
1481            |row| row.get(0),
1482        )
1483        .optional()
1484        .map_err(|error| {
1485            VectorRegenerationFailure::new(
1486                VectorRegenerationFailureClass::InvalidContract,
1487                error.to_string(),
1488            )
1489        })?;
1490    dimension
1491        .map(|value| {
1492            usize::try_from(value).map_err(|_| {
1493                VectorRegenerationFailure::new(
1494                    VectorRegenerationFailureClass::InvalidContract,
1495                    format!("stored vector profile dimension is invalid: {value}"),
1496                )
1497            })
1498        })
1499        .transpose()
1500}
1501
1502fn validate_existing_contract_version(
1503    conn: &rusqlite::Connection,
1504    profile: &str,
1505) -> Result<(), VectorRegenerationFailure> {
1506    let version: Option<i64> = conn
1507        .query_row(
1508            "SELECT contract_format_version FROM vector_embedding_contracts WHERE profile = ?1",
1509            [profile],
1510            |row| row.get(0),
1511        )
1512        .optional()
1513        .map_err(|error| {
1514            VectorRegenerationFailure::new(
1515                VectorRegenerationFailureClass::InvalidContract,
1516                error.to_string(),
1517            )
1518        })?;
1519    if let Some(version) = version
1520        && version > CURRENT_VECTOR_CONTRACT_FORMAT_VERSION
1521    {
1522        return Err(VectorRegenerationFailure::new(
1523            VectorRegenerationFailureClass::InvalidContract,
1524            format!(
1525                "persisted contract format version {version} is unsupported; supported version is {CURRENT_VECTOR_CONTRACT_FORMAT_VERSION}"
1526            ),
1527        ));
1528    }
1529    Ok(())
1530}
1531
1532fn serialize_audit_metadata(
1533    metadata: &VectorRegenerationAuditMetadata,
1534) -> Result<String, EngineError> {
1535    let json =
1536        serde_json::to_string(metadata).map_err(|error| EngineError::Bridge(error.to_string()))?;
1537    if json.len() > MAX_AUDIT_METADATA_BYTES {
1538        return Err(VectorRegenerationFailure::new(
1539            VectorRegenerationFailureClass::InvalidContract,
1540            format!("audit metadata exceeds {MAX_AUDIT_METADATA_BYTES} bytes"),
1541        )
1542        .to_engine_error());
1543    }
1544    Ok(json)
1545}
1546
1547pub(super) fn build_regeneration_input(
1548    config: &VectorRegenerationConfig,
1549    identity: &QueryEmbedderIdentity,
1550    chunks: Vec<VectorRegenerationInputChunk>,
1551) -> VectorRegenerationInput {
1552    VectorRegenerationInput {
1553        profile: config.profile.clone(),
1554        table_name: fathomdb_schema::vec_kind_table_name(&config.kind),
1555        model_identity: identity.model_identity.clone(),
1556        model_version: identity.model_version.clone(),
1557        dimension: identity.dimension,
1558        normalization_policy: identity.normalization_policy.clone(),
1559        chunking_policy: config.chunking_policy.clone(),
1560        preprocessing_policy: config.preprocessing_policy.clone(),
1561        chunks,
1562    }
1563}
1564
1565pub(super) fn compute_snapshot_hash(
1566    payload: &VectorRegenerationInput,
1567) -> Result<String, EngineError> {
1568    let bytes =
1569        serde_json::to_vec(payload).map_err(|error| EngineError::Bridge(error.to_string()))?;
1570    let mut hasher = sha2::Sha256::new();
1571    hasher.update(bytes);
1572    Ok(format!("{:x}", hasher.finalize()))
1573}
1574
1575/// Per-kind chunk enumeration used by [`AdminService::configure_vec_kind`].
1576///
1577/// Mirrors [`collect_regeneration_chunks`] but filters by `nodes.kind`.
1578pub(super) fn collect_kind_chunks(
1579    conn: &rusqlite::Connection,
1580    kind: &str,
1581) -> Result<Vec<VectorRegenerationInputChunk>, EngineError> {
1582    let mut stmt = conn.prepare(
1583        r"
1584        SELECT c.id, c.node_logical_id, n.kind, c.text_content, c.byte_start, c.byte_end, n.source_ref, c.created_at
1585        FROM chunks c
1586        JOIN nodes n
1587          ON n.logical_id = c.node_logical_id
1588         AND n.superseded_at IS NULL
1589        WHERE n.kind = ?1
1590        ORDER BY c.created_at, c.id
1591        ",
1592    )?;
1593    let chunks = stmt
1594        .query_map(rusqlite::params![kind], |row| {
1595            Ok(VectorRegenerationInputChunk {
1596                chunk_id: row.get(0)?,
1597                node_logical_id: row.get(1)?,
1598                kind: row.get(2)?,
1599                text_content: row.get(3)?,
1600                byte_start: row.get(4)?,
1601                byte_end: row.get(5)?,
1602                source_ref: row.get(6)?,
1603                created_at: row.get(7)?,
1604            })
1605        })?
1606        .collect::<Result<Vec<_>, _>>()?;
1607    Ok(chunks)
1608}
1609
1610/// Canonical hash of a chunk's text content, scoped to the chunk id.
1611#[must_use]
1612pub(crate) fn canonical_chunk_hash(chunk_id: &str, text: &str) -> String {
1613    let mut hasher = sha2::Sha256::new();
1614    hasher.update(chunk_id.as_bytes());
1615    hasher.update([0u8]);
1616    hasher.update(text.as_bytes());
1617    format!("{:x}", hasher.finalize())
1618}
1619
1620pub(super) fn collect_regeneration_chunks(
1621    conn: &rusqlite::Connection,
1622) -> Result<Vec<VectorRegenerationInputChunk>, EngineError> {
1623    let mut stmt = conn.prepare(
1624        r"
1625        SELECT c.id, c.node_logical_id, n.kind, c.text_content, c.byte_start, c.byte_end, n.source_ref, c.created_at
1626        FROM chunks c
1627        JOIN nodes n
1628          ON n.logical_id = c.node_logical_id
1629         AND n.superseded_at IS NULL
1630        ORDER BY c.created_at, c.id
1631        ",
1632    )?;
1633    let chunks = stmt
1634        .query_map([], |row| {
1635            Ok(VectorRegenerationInputChunk {
1636                chunk_id: row.get(0)?,
1637                node_logical_id: row.get(1)?,
1638                kind: row.get(2)?,
1639                text_content: row.get(3)?,
1640                byte_start: row.get(4)?,
1641                byte_end: row.get(5)?,
1642                source_ref: row.get(6)?,
1643                created_at: row.get(7)?,
1644            })
1645        })?
1646        .collect::<Result<Vec<_>, _>>()?;
1647    Ok(chunks)
1648}