1use std::path::Path;
2
3use fathomdb_schema::SchemaError;
4use rusqlite::{OptionalExtension, TransactionBehavior};
5use serde::{Deserialize, Serialize};
6use sha2::Digest;
7
8use super::{
9 AdminService, CURRENT_VECTOR_CONTRACT_FORMAT_VERSION, EngineError, MAX_AUDIT_METADATA_BYTES,
10 MAX_CONTRACT_JSON_BYTES, MAX_POLICY_LEN, MAX_PROFILE_LEN, ProjectionRepairReport,
11 ProjectionTarget, VecProfile, VectorRegenerationConfig, VectorRegenerationReport,
12};
13use crate::embedder::{BatchEmbedder, QueryEmbedder, QueryEmbedderIdentity};
14use crate::ids::new_id;
15
16#[allow(dead_code)]
17#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
18pub(super) struct VectorEmbeddingContractRecord {
19 profile: String,
20 table_name: String,
21 model_identity: String,
22 model_version: String,
23 dimension: usize,
24 normalization_policy: String,
25 chunking_policy: String,
26 preprocessing_policy: String,
27 generator_command_json: String,
28 applied_at: i64,
29 snapshot_hash: String,
30 contract_format_version: i64,
31}
32
33#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
34pub(super) struct VectorRegenerationInputChunk {
35 pub(super) chunk_id: String,
36 pub(super) node_logical_id: String,
37 pub(super) kind: String,
38 pub(super) text_content: String,
39 pub(super) byte_start: Option<i64>,
40 pub(super) byte_end: Option<i64>,
41 pub(super) source_ref: Option<String>,
42 pub(super) created_at: i64,
43}
44
45#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
46pub(super) struct VectorRegenerationInput {
47 pub(super) profile: String,
48 pub(super) table_name: String,
49 pub(super) model_identity: String,
50 pub(super) model_version: String,
51 pub(super) dimension: usize,
52 pub(super) normalization_policy: String,
53 pub(super) chunking_policy: String,
54 pub(super) preprocessing_policy: String,
55 pub(super) chunks: Vec<VectorRegenerationInputChunk>,
56}
57
58#[derive(Clone, Copy, Debug, PartialEq, Eq)]
59pub(crate) enum VectorRegenerationFailureClass {
60 InvalidContract,
61 EmbedderFailure,
62 InvalidEmbedderOutput,
63 SnapshotDrift,
64 UnsupportedVecCapability,
65}
66
67impl VectorRegenerationFailureClass {
68 fn label(self) -> &'static str {
69 match self {
70 Self::InvalidContract => "invalid contract",
71 Self::EmbedderFailure => "embedder failure",
72 Self::InvalidEmbedderOutput => "invalid embedder output",
73 Self::SnapshotDrift => "snapshot drift",
74 Self::UnsupportedVecCapability => "unsupported vec capability",
75 }
76 }
77
78 fn retryable(self) -> bool {
79 matches!(self, Self::SnapshotDrift)
80 }
81}
82
83#[derive(Clone, Debug, PartialEq, Eq)]
84pub(crate) struct VectorRegenerationFailure {
85 class: VectorRegenerationFailureClass,
86 detail: String,
87}
88
89impl VectorRegenerationFailure {
90 pub(crate) fn new(class: VectorRegenerationFailureClass, detail: impl Into<String>) -> Self {
91 Self {
92 class,
93 detail: detail.into(),
94 }
95 }
96
97 pub(super) fn to_engine_error(&self) -> EngineError {
98 let retry_suffix = if self.class.retryable() {
99 " [retryable]"
100 } else {
101 ""
102 };
103 EngineError::Bridge(format!(
104 "vector regeneration {}: {}{}",
105 self.class.label(),
106 self.detail,
107 retry_suffix
108 ))
109 }
110
111 pub(super) fn failure_class_label(&self) -> &'static str {
112 self.class.label()
113 }
114}
115
116#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
117pub(super) struct VectorRegenerationAuditMetadata {
118 pub(super) profile: String,
119 pub(super) model_identity: String,
120 pub(super) model_version: String,
121 pub(super) chunk_count: usize,
122 pub(super) snapshot_hash: String,
123 #[serde(skip_serializing_if = "Option::is_none")]
124 pub(super) failure_class: Option<String>,
125}
126
127impl AdminService {
128 pub fn get_vec_profile(&self, kind: &str) -> Result<Option<VecProfile>, EngineError> {
136 let conn = self.connect()?;
137 let result = conn
138 .query_row(
139 "SELECT \
140 json_extract(config_json, '$.model_identity'), \
141 json_extract(config_json, '$.model_version'), \
142 CAST(json_extract(config_json, '$.dimensions') AS INTEGER), \
143 active_at, \
144 created_at \
145 FROM projection_profiles WHERE kind = ?1 AND facet = 'vec'",
146 rusqlite::params![kind],
147 |row| {
148 Ok(VecProfile {
149 model_identity: row.get::<_, Option<String>>(0)?.unwrap_or_default(),
150 model_version: row.get(1)?,
151 dimensions: {
152 let d: i64 = row.get::<_, Option<i64>>(2)?.unwrap_or(0);
153 u32::try_from(d).unwrap_or(0)
154 },
155 active_at: row.get(3)?,
156 created_at: row.get(4)?,
157 })
158 },
159 )
160 .optional()?;
161 Ok(result)
162 }
163
164 #[allow(dead_code)]
169 fn set_vec_profile_inner(
170 conn: &rusqlite::Connection,
171 identity_json: &str,
172 ) -> Result<VecProfile, rusqlite::Error> {
173 conn.execute(
174 r"INSERT INTO projection_profiles (kind, facet, config_json, active_at, created_at)
175 VALUES ('*', 'vec', ?1, unixepoch(), unixepoch())
176 ON CONFLICT(kind, facet) DO UPDATE SET
177 config_json = ?1,
178 active_at = unixepoch()",
179 rusqlite::params![identity_json],
180 )?;
181 conn.query_row(
182 "SELECT \
183 json_extract(config_json, '$.model_identity'), \
184 json_extract(config_json, '$.model_version'), \
185 CAST(json_extract(config_json, '$.dimensions') AS INTEGER), \
186 active_at, \
187 created_at \
188 FROM projection_profiles WHERE kind = '*' AND facet = 'vec'",
189 [],
190 |row| {
191 Ok(VecProfile {
192 model_identity: row.get(0)?,
193 model_version: row.get(1)?,
194 dimensions: {
195 let d: i64 = row.get(2)?;
196 u32::try_from(d).unwrap_or(0)
197 },
198 active_at: row.get(3)?,
199 created_at: row.get(4)?,
200 })
201 },
202 )
203 }
204
205 pub fn set_vec_profile(&self, config_json: &str) -> Result<VecProfile, EngineError> {
214 let conn = self.connect()?;
215 Self::set_vec_profile_inner(&conn, config_json).map_err(EngineError::Sqlite)
216 }
217
218 pub fn preview_projection_impact(
226 &self,
227 kind: &str,
228 facet: &str,
229 ) -> Result<super::ProjectionImpact, EngineError> {
230 let conn = self.connect()?;
231 match facet {
232 "fts" => {
233 let rows: u64 = conn
234 .query_row(
235 "SELECT count(*) FROM nodes WHERE kind = ?1 AND superseded_at IS NULL",
236 rusqlite::params![kind],
237 |row| row.get::<_, i64>(0),
238 )
239 .map(i64::cast_unsigned)?;
240 let current_tokenizer = self.get_fts_profile(kind)?.map(|p| p.tokenizer);
241 Ok(super::ProjectionImpact {
242 rows_to_rebuild: rows,
243 estimated_seconds: rows / 5000,
244 temp_db_size_bytes: rows * 200,
245 current_tokenizer,
246 target_tokenizer: None,
247 })
248 }
249 "vec" => {
250 let rows: u64 = conn
251 .query_row("SELECT count(*) FROM chunks", [], |row| {
252 row.get::<_, i64>(0)
253 })
254 .map(i64::cast_unsigned)?;
255 Ok(super::ProjectionImpact {
256 rows_to_rebuild: rows,
257 estimated_seconds: rows / 100,
258 temp_db_size_bytes: rows * 1536,
259 current_tokenizer: None,
260 target_tokenizer: None,
261 })
262 }
263 other => Err(EngineError::Bridge(format!(
264 "unknown projection facet: {other:?}"
265 ))),
266 }
267 }
268
269 pub fn restore_vector_profiles(&self) -> Result<ProjectionRepairReport, EngineError> {
275 let conn = self.connect()?;
276 let profiles: Vec<(String, String, i64)> = {
277 let mut stmt = conn.prepare(
278 "SELECT profile, table_name, dimension \
279 FROM vector_profiles WHERE enabled = 1 ORDER BY profile",
280 )?;
281 stmt.query_map([], |row| {
282 Ok((
283 row.get::<_, String>(0)?,
284 row.get::<_, String>(1)?,
285 row.get::<_, i64>(2)?,
286 ))
287 })?
288 .collect::<Result<Vec<_>, _>>()?
289 };
290
291 for (profile, table_name, dimension) in &profiles {
292 let dimension = usize::try_from(*dimension).map_err(|_| {
293 EngineError::Bridge(format!("invalid vector profile dimension: {dimension}"))
294 })?;
295 self.schema_manager
296 .ensure_vector_profile(&conn, profile, table_name, dimension)?;
297 }
298
299 Ok(ProjectionRepairReport {
300 targets: vec![ProjectionTarget::Vec],
301 rebuilt_rows: profiles.len(),
302 notes: vec![],
303 })
304 }
305
306 #[allow(clippy::too_many_lines)]
322 pub fn regenerate_vector_embeddings(
323 &self,
324 embedder: &dyn QueryEmbedder,
325 config: &VectorRegenerationConfig,
326 ) -> Result<VectorRegenerationReport, EngineError> {
327 let conn = self.connect()?;
328 let identity = embedder.identity();
329 let config = validate_vector_regeneration_config(&conn, config, &identity)
330 .map_err(|failure| failure.to_engine_error())?;
331 let chunks = collect_regeneration_chunks(&conn)?;
332 let payload = build_regeneration_input(&config, &identity, chunks.clone());
333 let snapshot_hash = compute_snapshot_hash(&payload)?;
334 let audit_metadata = VectorRegenerationAuditMetadata {
335 profile: config.profile.clone(),
336 model_identity: identity.model_identity.clone(),
337 model_version: identity.model_version.clone(),
338 chunk_count: chunks.len(),
339 snapshot_hash: snapshot_hash.clone(),
340 failure_class: None,
341 };
342 persist_vector_regeneration_event(
343 &conn,
344 "vector_regeneration_requested",
345 &config.profile,
346 &audit_metadata,
347 )?;
348 let notes = vec!["vector embeddings regenerated via configured embedder".to_owned()];
349
350 let mut embedding_map: std::collections::HashMap<String, Vec<u8>> =
351 std::collections::HashMap::with_capacity(chunks.len());
352 for chunk in &chunks {
353 let vector = match embedder.embed_query(&chunk.text_content) {
354 Ok(vector) => vector,
355 Err(error) => {
356 let failure = VectorRegenerationFailure::new(
357 VectorRegenerationFailureClass::EmbedderFailure,
358 format!("embedder failed for chunk '{}': {error}", chunk.chunk_id),
359 );
360 self.persist_vector_regeneration_failure_best_effort(
361 &config.profile,
362 &audit_metadata,
363 &failure,
364 );
365 return Err(failure.to_engine_error());
366 }
367 };
368 if vector.len() != identity.dimension {
369 let failure = VectorRegenerationFailure::new(
370 VectorRegenerationFailureClass::InvalidEmbedderOutput,
371 format!(
372 "embedder produced {} values for chunk '{}', expected {}",
373 vector.len(),
374 chunk.chunk_id,
375 identity.dimension
376 ),
377 );
378 self.persist_vector_regeneration_failure_best_effort(
379 &config.profile,
380 &audit_metadata,
381 &failure,
382 );
383 return Err(failure.to_engine_error());
384 }
385 if vector.iter().any(|value| !value.is_finite()) {
386 let failure = VectorRegenerationFailure::new(
387 VectorRegenerationFailureClass::InvalidEmbedderOutput,
388 format!(
389 "embedder returned non-finite values for chunk '{}'",
390 chunk.chunk_id
391 ),
392 );
393 self.persist_vector_regeneration_failure_best_effort(
394 &config.profile,
395 &audit_metadata,
396 &failure,
397 );
398 return Err(failure.to_engine_error());
399 }
400 let bytes: Vec<u8> = vector
401 .iter()
402 .flat_map(|value| value.to_le_bytes())
403 .collect();
404 embedding_map.insert(chunk.chunk_id.clone(), bytes);
405 }
406
407 let table_name = fathomdb_schema::vec_kind_table_name(&config.kind);
408 let mut conn = conn;
409 let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
410 match self
411 .schema_manager
412 .ensure_vec_kind_profile(&tx, &config.kind, identity.dimension)
413 {
414 Ok(()) => {}
415 Err(SchemaError::MissingCapability(message)) => {
416 let failure = VectorRegenerationFailure::new(
417 VectorRegenerationFailureClass::UnsupportedVecCapability,
418 message,
419 );
420 drop(tx);
421 self.persist_vector_regeneration_failure_best_effort(
422 &config.profile,
423 &audit_metadata,
424 &failure,
425 );
426 return Err(failure.to_engine_error());
427 }
428 Err(error) => return Err(EngineError::Schema(error)),
429 }
430 let apply_chunks = collect_regeneration_chunks(&tx)?;
431 let apply_payload = build_regeneration_input(&config, &identity, apply_chunks.clone());
432 let apply_hash = compute_snapshot_hash(&apply_payload)?;
433 if apply_hash != snapshot_hash {
434 let failure = VectorRegenerationFailure::new(
435 VectorRegenerationFailureClass::SnapshotDrift,
436 "chunk snapshot changed during generation; retry".to_owned(),
437 );
438 drop(tx);
439 self.persist_vector_regeneration_failure_best_effort(
440 &config.profile,
441 &audit_metadata,
442 &failure,
443 );
444 return Err(failure.to_engine_error());
445 }
446 persist_vector_contract(&tx, &config, &table_name, &identity, &snapshot_hash)?;
447 tx.execute(&format!("DELETE FROM {table_name}"), [])?;
448 let mut stmt = tx.prepare_cached(&format!(
449 "INSERT INTO {table_name} (chunk_id, embedding) VALUES (?1, ?2)"
450 ))?;
451 let mut regenerated_rows = 0usize;
452 for chunk in &apply_chunks {
453 let Some(embedding) = embedding_map.remove(&chunk.chunk_id) else {
454 drop(stmt);
455 drop(tx);
456 let failure = VectorRegenerationFailure::new(
457 VectorRegenerationFailureClass::InvalidEmbedderOutput,
458 format!(
459 "embedder did not produce a vector for chunk '{}'",
460 chunk.chunk_id
461 ),
462 );
463 self.persist_vector_regeneration_failure_best_effort(
464 &config.profile,
465 &audit_metadata,
466 &failure,
467 );
468 return Err(failure.to_engine_error());
469 };
470 stmt.execute(rusqlite::params![chunk.chunk_id.as_str(), embedding])?;
471 regenerated_rows += 1;
472 }
473 drop(stmt);
474 persist_vector_regeneration_event(
475 &tx,
476 "vector_regeneration_apply",
477 &config.profile,
478 &audit_metadata,
479 )?;
480 tx.commit()?;
481
482 Ok(VectorRegenerationReport {
483 profile: config.profile.clone(),
484 table_name,
485 dimension: identity.dimension,
486 total_chunks: chunks.len(),
487 regenerated_rows,
488 contract_persisted: true,
489 notes,
490 })
491 }
492
493 #[allow(clippy::too_many_lines)]
507 pub fn regenerate_vector_embeddings_in_process(
508 &self,
509 embedder: &dyn BatchEmbedder,
510 config: &VectorRegenerationConfig,
511 ) -> Result<VectorRegenerationReport, EngineError> {
512 let conn = self.connect()?;
513 let identity = embedder.identity();
514 let config = validate_vector_regeneration_config(&conn, config, &identity)
515 .map_err(|failure| failure.to_engine_error())?;
516 let chunks = collect_regeneration_chunks(&conn)?;
517 let payload = build_regeneration_input(&config, &identity, chunks.clone());
518 let snapshot_hash = compute_snapshot_hash(&payload)?;
519 let audit_metadata = VectorRegenerationAuditMetadata {
520 profile: config.profile.clone(),
521 model_identity: identity.model_identity.clone(),
522 model_version: identity.model_version.clone(),
523 chunk_count: chunks.len(),
524 snapshot_hash: snapshot_hash.clone(),
525 failure_class: None,
526 };
527 persist_vector_regeneration_event(
528 &conn,
529 "vector_regeneration_requested",
530 &config.profile,
531 &audit_metadata,
532 )?;
533 let notes = vec!["vector embeddings regenerated via in-process batch embedder".to_owned()];
534
535 let chunk_texts: Vec<String> = chunks.iter().map(|c| c.text_content.clone()).collect();
537 let batch_vectors = match embedder.batch_embed(&chunk_texts) {
538 Ok(vecs) => vecs,
539 Err(error) => {
540 let failure = VectorRegenerationFailure::new(
541 VectorRegenerationFailureClass::EmbedderFailure,
542 format!("batch embedder failed: {error}"),
543 );
544 self.persist_vector_regeneration_failure_best_effort(
545 &config.profile,
546 &audit_metadata,
547 &failure,
548 );
549 return Err(failure.to_engine_error());
550 }
551 };
552 if batch_vectors.len() != chunks.len() {
553 let failure = VectorRegenerationFailure::new(
554 VectorRegenerationFailureClass::InvalidEmbedderOutput,
555 format!(
556 "batch embedder returned {} vectors for {} chunks",
557 batch_vectors.len(),
558 chunks.len()
559 ),
560 );
561 self.persist_vector_regeneration_failure_best_effort(
562 &config.profile,
563 &audit_metadata,
564 &failure,
565 );
566 return Err(failure.to_engine_error());
567 }
568
569 let mut embedding_map: std::collections::HashMap<String, Vec<u8>> =
570 std::collections::HashMap::with_capacity(chunks.len());
571 for (chunk, vector) in chunks.iter().zip(batch_vectors) {
572 if vector.len() != identity.dimension {
573 let failure = VectorRegenerationFailure::new(
574 VectorRegenerationFailureClass::InvalidEmbedderOutput,
575 format!(
576 "embedder produced {} values for chunk '{}', expected {}",
577 vector.len(),
578 chunk.chunk_id,
579 identity.dimension
580 ),
581 );
582 self.persist_vector_regeneration_failure_best_effort(
583 &config.profile,
584 &audit_metadata,
585 &failure,
586 );
587 return Err(failure.to_engine_error());
588 }
589 if vector.iter().any(|value| !value.is_finite()) {
590 let failure = VectorRegenerationFailure::new(
591 VectorRegenerationFailureClass::InvalidEmbedderOutput,
592 format!(
593 "embedder returned non-finite values for chunk '{}'",
594 chunk.chunk_id
595 ),
596 );
597 self.persist_vector_regeneration_failure_best_effort(
598 &config.profile,
599 &audit_metadata,
600 &failure,
601 );
602 return Err(failure.to_engine_error());
603 }
604 let bytes: Vec<u8> = vector
605 .iter()
606 .flat_map(|value| value.to_le_bytes())
607 .collect();
608 embedding_map.insert(chunk.chunk_id.clone(), bytes);
609 }
610
611 let mut conn = conn;
612 let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
613 let table_name = fathomdb_schema::vec_kind_table_name(&config.kind);
614 match self
615 .schema_manager
616 .ensure_vec_kind_profile(&tx, &config.kind, identity.dimension)
617 {
618 Ok(()) => {}
619 Err(SchemaError::MissingCapability(message)) => {
620 let failure = VectorRegenerationFailure::new(
621 VectorRegenerationFailureClass::UnsupportedVecCapability,
622 message,
623 );
624 drop(tx);
625 self.persist_vector_regeneration_failure_best_effort(
626 &config.profile,
627 &audit_metadata,
628 &failure,
629 );
630 return Err(failure.to_engine_error());
631 }
632 Err(error) => return Err(EngineError::Schema(error)),
633 }
634 let apply_chunks = collect_regeneration_chunks(&tx)?;
635 let apply_payload = build_regeneration_input(&config, &identity, apply_chunks.clone());
636 let apply_hash = compute_snapshot_hash(&apply_payload)?;
637 if apply_hash != snapshot_hash {
638 let failure = VectorRegenerationFailure::new(
639 VectorRegenerationFailureClass::SnapshotDrift,
640 "chunk snapshot changed during generation; retry".to_owned(),
641 );
642 drop(tx);
643 self.persist_vector_regeneration_failure_best_effort(
644 &config.profile,
645 &audit_metadata,
646 &failure,
647 );
648 return Err(failure.to_engine_error());
649 }
650 persist_vector_contract(&tx, &config, &table_name, &identity, &snapshot_hash)?;
651 tx.execute(&format!("DELETE FROM {table_name}"), [])?;
652 let mut stmt = tx.prepare_cached(&format!(
653 "INSERT INTO {table_name} (chunk_id, embedding) VALUES (?1, ?2)"
654 ))?;
655 let mut regenerated_rows = 0usize;
656 for chunk in &apply_chunks {
657 let Some(embedding) = embedding_map.remove(&chunk.chunk_id) else {
658 drop(stmt);
659 drop(tx);
660 let failure = VectorRegenerationFailure::new(
661 VectorRegenerationFailureClass::InvalidEmbedderOutput,
662 format!(
663 "embedder did not produce a vector for chunk '{}'",
664 chunk.chunk_id
665 ),
666 );
667 self.persist_vector_regeneration_failure_best_effort(
668 &config.profile,
669 &audit_metadata,
670 &failure,
671 );
672 return Err(failure.to_engine_error());
673 };
674 stmt.execute(rusqlite::params![chunk.chunk_id.as_str(), embedding])?;
675 regenerated_rows += 1;
676 }
677 drop(stmt);
678 persist_vector_regeneration_event(
679 &tx,
680 "vector_regeneration_apply",
681 &config.profile,
682 &audit_metadata,
683 )?;
684 tx.commit()?;
685
686 Ok(VectorRegenerationReport {
687 profile: config.profile.clone(),
688 table_name,
689 dimension: identity.dimension,
690 total_chunks: chunks.len(),
691 regenerated_rows,
692 contract_persisted: true,
693 notes,
694 })
695 }
696
697 pub(super) fn persist_vector_regeneration_failure_best_effort(
698 &self,
699 profile: &str,
700 metadata: &VectorRegenerationAuditMetadata,
701 failure: &VectorRegenerationFailure,
702 ) {
703 let Ok(conn) = self.connect() else {
704 return;
705 };
706 let failure_metadata = VectorRegenerationAuditMetadata {
707 profile: metadata.profile.clone(),
708 model_identity: metadata.model_identity.clone(),
709 model_version: metadata.model_version.clone(),
710 chunk_count: metadata.chunk_count,
711 snapshot_hash: metadata.snapshot_hash.clone(),
712 failure_class: Some(failure.failure_class_label().to_owned()),
713 };
714 let _ = persist_vector_regeneration_event(
715 &conn,
716 "vector_regeneration_failed",
717 profile,
718 &failure_metadata,
719 );
720 }
721
722 pub fn rebuild_projections(
725 &self,
726 target: ProjectionTarget,
727 ) -> Result<ProjectionRepairReport, EngineError> {
728 self.projections.rebuild_projections(target)
729 }
730
731 pub fn rebuild_missing_projections(&self) -> Result<ProjectionRepairReport, EngineError> {
734 self.projections.rebuild_missing_projections()
735 }
736}
737
738pub fn load_vector_regeneration_config(
741 path: impl AsRef<Path>,
742) -> Result<VectorRegenerationConfig, EngineError> {
743 let path = path.as_ref();
744 let raw = std::fs::read_to_string(path)?;
745 match path.extension().and_then(|ext| ext.to_str()) {
746 Some("toml") => {
747 toml::from_str(&raw).map_err(|error| EngineError::Bridge(error.to_string()))
748 }
749 Some("json") | None => {
750 serde_json::from_str(&raw).map_err(|error| EngineError::Bridge(error.to_string()))
751 }
752 Some(other) => Err(EngineError::Bridge(format!(
753 "unsupported vector regeneration config extension: {other}"
754 ))),
755 }
756}
757
758fn validate_vector_regeneration_config(
759 conn: &rusqlite::Connection,
760 config: &VectorRegenerationConfig,
761 identity: &QueryEmbedderIdentity,
762) -> Result<VectorRegenerationConfig, VectorRegenerationFailure> {
763 let kind = validate_bounded_text("kind", &config.kind, MAX_PROFILE_LEN)?;
764 let profile = validate_bounded_text("profile", &config.profile, MAX_PROFILE_LEN)?;
765 if identity.dimension == 0 {
766 return Err(VectorRegenerationFailure::new(
767 VectorRegenerationFailureClass::InvalidContract,
768 "embedder reports dimension 0".to_owned(),
769 ));
770 }
771 let chunking_policy =
772 validate_bounded_text("chunking_policy", &config.chunking_policy, MAX_POLICY_LEN)?;
773 let preprocessing_policy = validate_bounded_text(
774 "preprocessing_policy",
775 &config.preprocessing_policy,
776 MAX_POLICY_LEN,
777 )?;
778
779 if let Some(existing_dimension) = current_vector_profile_dimension(conn, &profile)?
780 && existing_dimension != identity.dimension
781 {
782 return Err(VectorRegenerationFailure::new(
783 VectorRegenerationFailureClass::InvalidContract,
784 format!(
785 "embedder dimension {} does not match existing vector profile dimension {}",
786 identity.dimension, existing_dimension
787 ),
788 ));
789 }
790
791 validate_existing_contract_version(conn, &profile)?;
792
793 let normalized = VectorRegenerationConfig {
794 kind,
795 profile,
796 chunking_policy,
797 preprocessing_policy,
798 };
799 let serialized = serde_json::to_vec(&normalized).map_err(|error| {
800 VectorRegenerationFailure::new(
801 VectorRegenerationFailureClass::InvalidContract,
802 error.to_string(),
803 )
804 })?;
805 if serialized.len() > MAX_CONTRACT_JSON_BYTES {
806 return Err(VectorRegenerationFailure::new(
807 VectorRegenerationFailureClass::InvalidContract,
808 format!("serialized contract exceeds {MAX_CONTRACT_JSON_BYTES} bytes"),
809 ));
810 }
811
812 Ok(normalized)
813}
814
815#[allow(clippy::cast_possible_wrap)]
816fn persist_vector_contract(
817 conn: &rusqlite::Connection,
818 config: &VectorRegenerationConfig,
819 table_name: &str,
820 identity: &QueryEmbedderIdentity,
821 snapshot_hash: &str,
822) -> Result<(), EngineError> {
823 conn.execute(
824 r"
825 INSERT OR REPLACE INTO vector_embedding_contracts (
826 profile,
827 table_name,
828 model_identity,
829 model_version,
830 dimension,
831 normalization_policy,
832 chunking_policy,
833 preprocessing_policy,
834 generator_command_json,
835 applied_at,
836 snapshot_hash,
837 contract_format_version,
838 updated_at
839 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, unixepoch(), ?10, ?11, unixepoch())
840 ",
841 rusqlite::params![
842 config.profile.as_str(),
843 table_name,
844 identity.model_identity.as_str(),
845 identity.model_version.as_str(),
846 identity.dimension as i64,
847 identity.normalization_policy.as_str(),
848 config.chunking_policy.as_str(),
849 config.preprocessing_policy.as_str(),
850 "[]",
851 snapshot_hash,
852 CURRENT_VECTOR_CONTRACT_FORMAT_VERSION,
853 ],
854 )?;
855 Ok(())
856}
857
858fn persist_vector_regeneration_event(
859 conn: &rusqlite::Connection,
860 event_type: &str,
861 subject: &str,
862 metadata: &VectorRegenerationAuditMetadata,
863) -> Result<(), EngineError> {
864 let metadata_json = serialize_audit_metadata(metadata)?;
865 conn.execute(
866 "INSERT INTO provenance_events (id, event_type, subject, metadata_json) VALUES (?1, ?2, ?3, ?4)",
867 rusqlite::params![new_id(), event_type, subject, metadata_json],
868 )?;
869 Ok(())
870}
871
872fn validate_bounded_text(
873 field: &str,
874 value: &str,
875 max_len: usize,
876) -> Result<String, VectorRegenerationFailure> {
877 let trimmed = value.trim();
878 if trimmed.is_empty() {
879 return Err(VectorRegenerationFailure::new(
880 VectorRegenerationFailureClass::InvalidContract,
881 format!("{field} must not be empty"),
882 ));
883 }
884 if trimmed.len() > max_len {
885 return Err(VectorRegenerationFailure::new(
886 VectorRegenerationFailureClass::InvalidContract,
887 format!("{field} exceeds max length {max_len}"),
888 ));
889 }
890 Ok(trimmed.to_owned())
891}
892
893fn current_vector_profile_dimension(
894 conn: &rusqlite::Connection,
895 profile: &str,
896) -> Result<Option<usize>, VectorRegenerationFailure> {
897 let dimension: Option<i64> = conn
898 .query_row(
899 "SELECT dimension FROM vector_profiles WHERE profile = ?1 AND enabled = 1",
900 [profile],
901 |row| row.get(0),
902 )
903 .optional()
904 .map_err(|error| {
905 VectorRegenerationFailure::new(
906 VectorRegenerationFailureClass::InvalidContract,
907 error.to_string(),
908 )
909 })?;
910 dimension
911 .map(|value| {
912 usize::try_from(value).map_err(|_| {
913 VectorRegenerationFailure::new(
914 VectorRegenerationFailureClass::InvalidContract,
915 format!("stored vector profile dimension is invalid: {value}"),
916 )
917 })
918 })
919 .transpose()
920}
921
922fn validate_existing_contract_version(
923 conn: &rusqlite::Connection,
924 profile: &str,
925) -> Result<(), VectorRegenerationFailure> {
926 let version: Option<i64> = conn
927 .query_row(
928 "SELECT contract_format_version FROM vector_embedding_contracts WHERE profile = ?1",
929 [profile],
930 |row| row.get(0),
931 )
932 .optional()
933 .map_err(|error| {
934 VectorRegenerationFailure::new(
935 VectorRegenerationFailureClass::InvalidContract,
936 error.to_string(),
937 )
938 })?;
939 if let Some(version) = version
940 && version > CURRENT_VECTOR_CONTRACT_FORMAT_VERSION
941 {
942 return Err(VectorRegenerationFailure::new(
943 VectorRegenerationFailureClass::InvalidContract,
944 format!(
945 "persisted contract format version {version} is unsupported; supported version is {CURRENT_VECTOR_CONTRACT_FORMAT_VERSION}"
946 ),
947 ));
948 }
949 Ok(())
950}
951
952fn serialize_audit_metadata(
953 metadata: &VectorRegenerationAuditMetadata,
954) -> Result<String, EngineError> {
955 let json =
956 serde_json::to_string(metadata).map_err(|error| EngineError::Bridge(error.to_string()))?;
957 if json.len() > MAX_AUDIT_METADATA_BYTES {
958 return Err(VectorRegenerationFailure::new(
959 VectorRegenerationFailureClass::InvalidContract,
960 format!("audit metadata exceeds {MAX_AUDIT_METADATA_BYTES} bytes"),
961 )
962 .to_engine_error());
963 }
964 Ok(json)
965}
966
967pub(super) fn build_regeneration_input(
968 config: &VectorRegenerationConfig,
969 identity: &QueryEmbedderIdentity,
970 chunks: Vec<VectorRegenerationInputChunk>,
971) -> VectorRegenerationInput {
972 VectorRegenerationInput {
973 profile: config.profile.clone(),
974 table_name: fathomdb_schema::vec_kind_table_name(&config.kind),
975 model_identity: identity.model_identity.clone(),
976 model_version: identity.model_version.clone(),
977 dimension: identity.dimension,
978 normalization_policy: identity.normalization_policy.clone(),
979 chunking_policy: config.chunking_policy.clone(),
980 preprocessing_policy: config.preprocessing_policy.clone(),
981 chunks,
982 }
983}
984
985pub(super) fn compute_snapshot_hash(
986 payload: &VectorRegenerationInput,
987) -> Result<String, EngineError> {
988 let bytes =
989 serde_json::to_vec(payload).map_err(|error| EngineError::Bridge(error.to_string()))?;
990 let mut hasher = sha2::Sha256::new();
991 hasher.update(bytes);
992 Ok(format!("{:x}", hasher.finalize()))
993}
994
995pub(super) fn collect_regeneration_chunks(
996 conn: &rusqlite::Connection,
997) -> Result<Vec<VectorRegenerationInputChunk>, EngineError> {
998 let mut stmt = conn.prepare(
999 r"
1000 SELECT c.id, c.node_logical_id, n.kind, c.text_content, c.byte_start, c.byte_end, n.source_ref, c.created_at
1001 FROM chunks c
1002 JOIN nodes n
1003 ON n.logical_id = c.node_logical_id
1004 AND n.superseded_at IS NULL
1005 ORDER BY c.created_at, c.id
1006 ",
1007 )?;
1008 let chunks = stmt
1009 .query_map([], |row| {
1010 Ok(VectorRegenerationInputChunk {
1011 chunk_id: row.get(0)?,
1012 node_logical_id: row.get(1)?,
1013 kind: row.get(2)?,
1014 text_content: row.get(3)?,
1015 byte_start: row.get(4)?,
1016 byte_end: row.get(5)?,
1017 source_ref: row.get(6)?,
1018 created_at: row.get(7)?,
1019 })
1020 })?
1021 .collect::<Result<Vec<_>, _>>()?;
1022 Ok(chunks)
1023}