1use std::path::Path;
2
3use fathomdb_schema::SchemaError;
4use rusqlite::{OptionalExtension, TransactionBehavior};
5use serde::{Deserialize, Serialize};
6use sha2::Digest;
7
8use super::{
9 AdminService, CURRENT_VECTOR_CONTRACT_FORMAT_VERSION, EngineError, MAX_AUDIT_METADATA_BYTES,
10 MAX_CONTRACT_JSON_BYTES, MAX_POLICY_LEN, MAX_PROFILE_LEN, ProjectionRepairReport,
11 ProjectionTarget, VecProfile, VectorRegenerationConfig, VectorRegenerationReport,
12};
13use crate::embedder::{BatchEmbedder, QueryEmbedder, QueryEmbedderIdentity};
14use crate::ids::new_id;
15
16#[allow(dead_code)]
17#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
18pub(super) struct VectorEmbeddingContractRecord {
19 profile: String,
20 table_name: String,
21 model_identity: String,
22 model_version: String,
23 dimension: usize,
24 normalization_policy: String,
25 chunking_policy: String,
26 preprocessing_policy: String,
27 generator_command_json: String,
28 applied_at: i64,
29 snapshot_hash: String,
30 contract_format_version: i64,
31}
32
33#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
34pub(super) struct VectorRegenerationInputChunk {
35 pub(super) chunk_id: String,
36 pub(super) node_logical_id: String,
37 pub(super) kind: String,
38 pub(super) text_content: String,
39 pub(super) byte_start: Option<i64>,
40 pub(super) byte_end: Option<i64>,
41 pub(super) source_ref: Option<String>,
42 pub(super) created_at: i64,
43}
44
45#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
46pub(super) struct VectorRegenerationInput {
47 pub(super) profile: String,
48 pub(super) table_name: String,
49 pub(super) model_identity: String,
50 pub(super) model_version: String,
51 pub(super) dimension: usize,
52 pub(super) normalization_policy: String,
53 pub(super) chunking_policy: String,
54 pub(super) preprocessing_policy: String,
55 pub(super) chunks: Vec<VectorRegenerationInputChunk>,
56}
57
58#[derive(Clone, Copy, Debug, PartialEq, Eq)]
59pub(crate) enum VectorRegenerationFailureClass {
60 InvalidContract,
61 EmbedderFailure,
62 InvalidEmbedderOutput,
63 SnapshotDrift,
64 UnsupportedVecCapability,
65}
66
67impl VectorRegenerationFailureClass {
68 fn label(self) -> &'static str {
69 match self {
70 Self::InvalidContract => "invalid contract",
71 Self::EmbedderFailure => "embedder failure",
72 Self::InvalidEmbedderOutput => "invalid embedder output",
73 Self::SnapshotDrift => "snapshot drift",
74 Self::UnsupportedVecCapability => "unsupported vec capability",
75 }
76 }
77
78 fn retryable(self) -> bool {
79 matches!(self, Self::SnapshotDrift)
80 }
81}
82
83#[derive(Clone, Debug, PartialEq, Eq)]
84pub(crate) struct VectorRegenerationFailure {
85 class: VectorRegenerationFailureClass,
86 detail: String,
87}
88
89impl VectorRegenerationFailure {
90 pub(crate) fn new(class: VectorRegenerationFailureClass, detail: impl Into<String>) -> Self {
91 Self {
92 class,
93 detail: detail.into(),
94 }
95 }
96
97 pub(super) fn to_engine_error(&self) -> EngineError {
98 let retry_suffix = if self.class.retryable() {
99 " [retryable]"
100 } else {
101 ""
102 };
103 EngineError::Bridge(format!(
104 "vector regeneration {}: {}{}",
105 self.class.label(),
106 self.detail,
107 retry_suffix
108 ))
109 }
110
111 pub(super) fn failure_class_label(&self) -> &'static str {
112 self.class.label()
113 }
114}
115
116#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
117pub(super) struct VectorRegenerationAuditMetadata {
118 pub(super) profile: String,
119 pub(super) model_identity: String,
120 pub(super) model_version: String,
121 pub(super) chunk_count: usize,
122 pub(super) snapshot_hash: String,
123 #[serde(skip_serializing_if = "Option::is_none")]
124 pub(super) failure_class: Option<String>,
125}
126
127#[non_exhaustive]
129#[derive(Clone, Copy, Debug, PartialEq, Eq)]
130pub enum VectorSource {
131 Chunks,
133}
134
135#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
137pub struct ConfigureVecOutcome {
138 pub kind: String,
140 pub enqueued_backfill_rows: usize,
142 pub was_already_enabled: bool,
145}
146
147#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
151pub struct VecIndexStatus {
152 pub kind: String,
154 pub enabled: bool,
156 pub state: String,
159 pub pending_incremental: u64,
161 pub pending_backfill: u64,
163 pub last_error: Option<String>,
165 pub last_completed_at: Option<i64>,
167 pub embedding_identity: Option<String>,
169}
170
171impl AdminService {
172 pub fn get_vec_profile(&self, kind: &str) -> Result<Option<VecProfile>, EngineError> {
180 let conn = self.connect()?;
181 let result = conn
182 .query_row(
183 "SELECT \
184 json_extract(config_json, '$.model_identity'), \
185 json_extract(config_json, '$.model_version'), \
186 CAST(json_extract(config_json, '$.dimensions') AS INTEGER), \
187 active_at, \
188 created_at \
189 FROM projection_profiles WHERE kind = ?1 AND facet = 'vec'",
190 rusqlite::params![kind],
191 |row| {
192 Ok(VecProfile {
193 model_identity: row.get::<_, Option<String>>(0)?.unwrap_or_default(),
194 model_version: row.get(1)?,
195 dimensions: {
196 let d: i64 = row.get::<_, Option<i64>>(2)?.unwrap_or(0);
197 u32::try_from(d).unwrap_or(0)
198 },
199 active_at: row.get(3)?,
200 created_at: row.get(4)?,
201 })
202 },
203 )
204 .optional()?;
205 Ok(result)
206 }
207
208 #[allow(dead_code)]
213 fn set_vec_profile_inner(
214 conn: &rusqlite::Connection,
215 identity_json: &str,
216 ) -> Result<VecProfile, rusqlite::Error> {
217 conn.execute(
218 r"INSERT INTO projection_profiles (kind, facet, config_json, active_at, created_at)
219 VALUES ('*', 'vec', ?1, unixepoch(), unixepoch())
220 ON CONFLICT(kind, facet) DO UPDATE SET
221 config_json = ?1,
222 active_at = unixepoch()",
223 rusqlite::params![identity_json],
224 )?;
225 conn.query_row(
226 "SELECT \
227 json_extract(config_json, '$.model_identity'), \
228 json_extract(config_json, '$.model_version'), \
229 CAST(json_extract(config_json, '$.dimensions') AS INTEGER), \
230 active_at, \
231 created_at \
232 FROM projection_profiles WHERE kind = '*' AND facet = 'vec'",
233 [],
234 |row| {
235 Ok(VecProfile {
236 model_identity: row.get(0)?,
237 model_version: row.get(1)?,
238 dimensions: {
239 let d: i64 = row.get(2)?;
240 u32::try_from(d).unwrap_or(0)
241 },
242 active_at: row.get(3)?,
243 created_at: row.get(4)?,
244 })
245 },
246 )
247 }
248
249 pub fn set_vec_profile(&self, config_json: &str) -> Result<VecProfile, EngineError> {
258 let conn = self.connect()?;
259 Self::set_vec_profile_inner(&conn, config_json).map_err(EngineError::Sqlite)
260 }
261
262 pub fn preview_projection_impact(
270 &self,
271 kind: &str,
272 facet: &str,
273 ) -> Result<super::ProjectionImpact, EngineError> {
274 let conn = self.connect()?;
275 match facet {
276 "fts" => {
277 let rows: u64 = conn
278 .query_row(
279 "SELECT count(*) FROM nodes WHERE kind = ?1 AND superseded_at IS NULL",
280 rusqlite::params![kind],
281 |row| row.get::<_, i64>(0),
282 )
283 .map(i64::cast_unsigned)?;
284 let current_tokenizer = self.get_fts_profile(kind)?.map(|p| p.tokenizer);
285 Ok(super::ProjectionImpact {
286 rows_to_rebuild: rows,
287 estimated_seconds: rows / 5000,
288 temp_db_size_bytes: rows * 200,
289 current_tokenizer,
290 target_tokenizer: None,
291 })
292 }
293 "vec" => {
294 let rows: u64 = conn
295 .query_row("SELECT count(*) FROM chunks", [], |row| {
296 row.get::<_, i64>(0)
297 })
298 .map(i64::cast_unsigned)?;
299 Ok(super::ProjectionImpact {
300 rows_to_rebuild: rows,
301 estimated_seconds: rows / 100,
302 temp_db_size_bytes: rows * 1536,
303 current_tokenizer: None,
304 target_tokenizer: None,
305 })
306 }
307 other => Err(EngineError::Bridge(format!(
308 "unknown projection facet: {other:?}"
309 ))),
310 }
311 }
312
313 pub fn restore_vector_profiles(&self) -> Result<ProjectionRepairReport, EngineError> {
319 let conn = self.connect()?;
320 let profiles: Vec<(String, String, i64)> = {
321 let mut stmt = conn.prepare(
322 "SELECT profile, table_name, dimension \
323 FROM vector_profiles WHERE enabled = 1 ORDER BY profile",
324 )?;
325 stmt.query_map([], |row| {
326 Ok((
327 row.get::<_, String>(0)?,
328 row.get::<_, String>(1)?,
329 row.get::<_, i64>(2)?,
330 ))
331 })?
332 .collect::<Result<Vec<_>, _>>()?
333 };
334
335 for (profile, table_name, dimension) in &profiles {
336 let dimension = usize::try_from(*dimension).map_err(|_| {
337 EngineError::Bridge(format!("invalid vector profile dimension: {dimension}"))
338 })?;
339 self.schema_manager
340 .ensure_vector_profile(&conn, profile, table_name, dimension)?;
341 }
342
343 Ok(ProjectionRepairReport {
344 targets: vec![ProjectionTarget::Vec],
345 rebuilt_rows: profiles.len(),
346 notes: vec![],
347 })
348 }
349
350 #[allow(clippy::too_many_lines)]
366 pub fn regenerate_vector_embeddings(
367 &self,
368 embedder: &dyn QueryEmbedder,
369 config: &VectorRegenerationConfig,
370 ) -> Result<VectorRegenerationReport, EngineError> {
371 let conn = self.connect()?;
372 let identity = embedder.identity();
373 let config = validate_vector_regeneration_config(&conn, config, &identity)
374 .map_err(|failure| failure.to_engine_error())?;
375 let chunks = collect_regeneration_chunks(&conn)?;
376 let payload = build_regeneration_input(&config, &identity, chunks.clone());
377 let snapshot_hash = compute_snapshot_hash(&payload)?;
378 let audit_metadata = VectorRegenerationAuditMetadata {
379 profile: config.profile.clone(),
380 model_identity: identity.model_identity.clone(),
381 model_version: identity.model_version.clone(),
382 chunk_count: chunks.len(),
383 snapshot_hash: snapshot_hash.clone(),
384 failure_class: None,
385 };
386 persist_vector_regeneration_event(
387 &conn,
388 "vector_regeneration_requested",
389 &config.profile,
390 &audit_metadata,
391 )?;
392 let notes = vec!["vector embeddings regenerated via configured embedder".to_owned()];
393
394 let mut embedding_map: std::collections::HashMap<String, Vec<u8>> =
395 std::collections::HashMap::with_capacity(chunks.len());
396 for chunk in &chunks {
397 let vector = match embedder.embed_query(&chunk.text_content) {
398 Ok(vector) => vector,
399 Err(error) => {
400 let failure = VectorRegenerationFailure::new(
401 VectorRegenerationFailureClass::EmbedderFailure,
402 format!("embedder failed for chunk '{}': {error}", chunk.chunk_id),
403 );
404 self.persist_vector_regeneration_failure_best_effort(
405 &config.profile,
406 &audit_metadata,
407 &failure,
408 );
409 return Err(failure.to_engine_error());
410 }
411 };
412 if vector.len() != identity.dimension {
413 let failure = VectorRegenerationFailure::new(
414 VectorRegenerationFailureClass::InvalidEmbedderOutput,
415 format!(
416 "embedder produced {} values for chunk '{}', expected {}",
417 vector.len(),
418 chunk.chunk_id,
419 identity.dimension
420 ),
421 );
422 self.persist_vector_regeneration_failure_best_effort(
423 &config.profile,
424 &audit_metadata,
425 &failure,
426 );
427 return Err(failure.to_engine_error());
428 }
429 if vector.iter().any(|value| !value.is_finite()) {
430 let failure = VectorRegenerationFailure::new(
431 VectorRegenerationFailureClass::InvalidEmbedderOutput,
432 format!(
433 "embedder returned non-finite values for chunk '{}'",
434 chunk.chunk_id
435 ),
436 );
437 self.persist_vector_regeneration_failure_best_effort(
438 &config.profile,
439 &audit_metadata,
440 &failure,
441 );
442 return Err(failure.to_engine_error());
443 }
444 let bytes: Vec<u8> = vector
445 .iter()
446 .flat_map(|value| value.to_le_bytes())
447 .collect();
448 embedding_map.insert(chunk.chunk_id.clone(), bytes);
449 }
450
451 let table_name = fathomdb_schema::vec_kind_table_name(&config.kind);
452 let mut conn = conn;
453 let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
454 match self
455 .schema_manager
456 .ensure_vec_kind_profile(&tx, &config.kind, identity.dimension)
457 {
458 Ok(()) => {}
459 Err(SchemaError::MissingCapability(message)) => {
460 let failure = VectorRegenerationFailure::new(
461 VectorRegenerationFailureClass::UnsupportedVecCapability,
462 message,
463 );
464 drop(tx);
465 self.persist_vector_regeneration_failure_best_effort(
466 &config.profile,
467 &audit_metadata,
468 &failure,
469 );
470 return Err(failure.to_engine_error());
471 }
472 Err(error) => return Err(EngineError::Schema(error)),
473 }
474 let apply_chunks = collect_regeneration_chunks(&tx)?;
475 let apply_payload = build_regeneration_input(&config, &identity, apply_chunks.clone());
476 let apply_hash = compute_snapshot_hash(&apply_payload)?;
477 if apply_hash != snapshot_hash {
478 let failure = VectorRegenerationFailure::new(
479 VectorRegenerationFailureClass::SnapshotDrift,
480 "chunk snapshot changed during generation; retry".to_owned(),
481 );
482 drop(tx);
483 self.persist_vector_regeneration_failure_best_effort(
484 &config.profile,
485 &audit_metadata,
486 &failure,
487 );
488 return Err(failure.to_engine_error());
489 }
490 persist_vector_contract(&tx, &config, &table_name, &identity, &snapshot_hash)?;
491 tx.execute(&format!("DELETE FROM {table_name}"), [])?;
492 let mut stmt = tx.prepare_cached(&format!(
493 "INSERT INTO {table_name} (chunk_id, embedding) VALUES (?1, ?2)"
494 ))?;
495 let mut regenerated_rows = 0usize;
496 for chunk in &apply_chunks {
497 let Some(embedding) = embedding_map.remove(&chunk.chunk_id) else {
498 drop(stmt);
499 drop(tx);
500 let failure = VectorRegenerationFailure::new(
501 VectorRegenerationFailureClass::InvalidEmbedderOutput,
502 format!(
503 "embedder did not produce a vector for chunk '{}'",
504 chunk.chunk_id
505 ),
506 );
507 self.persist_vector_regeneration_failure_best_effort(
508 &config.profile,
509 &audit_metadata,
510 &failure,
511 );
512 return Err(failure.to_engine_error());
513 };
514 stmt.execute(rusqlite::params![chunk.chunk_id.as_str(), embedding])?;
515 regenerated_rows += 1;
516 }
517 drop(stmt);
518 persist_vector_regeneration_event(
519 &tx,
520 "vector_regeneration_apply",
521 &config.profile,
522 &audit_metadata,
523 )?;
524 tx.commit()?;
525
526 Ok(VectorRegenerationReport {
527 profile: config.profile.clone(),
528 table_name,
529 dimension: identity.dimension,
530 total_chunks: chunks.len(),
531 regenerated_rows,
532 contract_persisted: true,
533 notes,
534 })
535 }
536
537 #[allow(clippy::too_many_lines)]
551 pub fn regenerate_vector_embeddings_in_process(
552 &self,
553 embedder: &dyn BatchEmbedder,
554 config: &VectorRegenerationConfig,
555 ) -> Result<VectorRegenerationReport, EngineError> {
556 let conn = self.connect()?;
557 let identity = embedder.identity();
558 let config = validate_vector_regeneration_config(&conn, config, &identity)
559 .map_err(|failure| failure.to_engine_error())?;
560 let chunks = collect_regeneration_chunks(&conn)?;
561 let payload = build_regeneration_input(&config, &identity, chunks.clone());
562 let snapshot_hash = compute_snapshot_hash(&payload)?;
563 let audit_metadata = VectorRegenerationAuditMetadata {
564 profile: config.profile.clone(),
565 model_identity: identity.model_identity.clone(),
566 model_version: identity.model_version.clone(),
567 chunk_count: chunks.len(),
568 snapshot_hash: snapshot_hash.clone(),
569 failure_class: None,
570 };
571 persist_vector_regeneration_event(
572 &conn,
573 "vector_regeneration_requested",
574 &config.profile,
575 &audit_metadata,
576 )?;
577 let notes = vec!["vector embeddings regenerated via in-process batch embedder".to_owned()];
578
579 let chunk_texts: Vec<String> = chunks.iter().map(|c| c.text_content.clone()).collect();
581 let batch_vectors = match embedder.batch_embed(&chunk_texts) {
582 Ok(vecs) => vecs,
583 Err(error) => {
584 let failure = VectorRegenerationFailure::new(
585 VectorRegenerationFailureClass::EmbedderFailure,
586 format!("batch embedder failed: {error}"),
587 );
588 self.persist_vector_regeneration_failure_best_effort(
589 &config.profile,
590 &audit_metadata,
591 &failure,
592 );
593 return Err(failure.to_engine_error());
594 }
595 };
596 if batch_vectors.len() != chunks.len() {
597 let failure = VectorRegenerationFailure::new(
598 VectorRegenerationFailureClass::InvalidEmbedderOutput,
599 format!(
600 "batch embedder returned {} vectors for {} chunks",
601 batch_vectors.len(),
602 chunks.len()
603 ),
604 );
605 self.persist_vector_regeneration_failure_best_effort(
606 &config.profile,
607 &audit_metadata,
608 &failure,
609 );
610 return Err(failure.to_engine_error());
611 }
612
613 let mut embedding_map: std::collections::HashMap<String, Vec<u8>> =
614 std::collections::HashMap::with_capacity(chunks.len());
615 for (chunk, vector) in chunks.iter().zip(batch_vectors) {
616 if vector.len() != identity.dimension {
617 let failure = VectorRegenerationFailure::new(
618 VectorRegenerationFailureClass::InvalidEmbedderOutput,
619 format!(
620 "embedder produced {} values for chunk '{}', expected {}",
621 vector.len(),
622 chunk.chunk_id,
623 identity.dimension
624 ),
625 );
626 self.persist_vector_regeneration_failure_best_effort(
627 &config.profile,
628 &audit_metadata,
629 &failure,
630 );
631 return Err(failure.to_engine_error());
632 }
633 if vector.iter().any(|value| !value.is_finite()) {
634 let failure = VectorRegenerationFailure::new(
635 VectorRegenerationFailureClass::InvalidEmbedderOutput,
636 format!(
637 "embedder returned non-finite values for chunk '{}'",
638 chunk.chunk_id
639 ),
640 );
641 self.persist_vector_regeneration_failure_best_effort(
642 &config.profile,
643 &audit_metadata,
644 &failure,
645 );
646 return Err(failure.to_engine_error());
647 }
648 let bytes: Vec<u8> = vector
649 .iter()
650 .flat_map(|value| value.to_le_bytes())
651 .collect();
652 embedding_map.insert(chunk.chunk_id.clone(), bytes);
653 }
654
655 let mut conn = conn;
656 let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
657 let table_name = fathomdb_schema::vec_kind_table_name(&config.kind);
658 match self
659 .schema_manager
660 .ensure_vec_kind_profile(&tx, &config.kind, identity.dimension)
661 {
662 Ok(()) => {}
663 Err(SchemaError::MissingCapability(message)) => {
664 let failure = VectorRegenerationFailure::new(
665 VectorRegenerationFailureClass::UnsupportedVecCapability,
666 message,
667 );
668 drop(tx);
669 self.persist_vector_regeneration_failure_best_effort(
670 &config.profile,
671 &audit_metadata,
672 &failure,
673 );
674 return Err(failure.to_engine_error());
675 }
676 Err(error) => return Err(EngineError::Schema(error)),
677 }
678 let apply_chunks = collect_regeneration_chunks(&tx)?;
679 let apply_payload = build_regeneration_input(&config, &identity, apply_chunks.clone());
680 let apply_hash = compute_snapshot_hash(&apply_payload)?;
681 if apply_hash != snapshot_hash {
682 let failure = VectorRegenerationFailure::new(
683 VectorRegenerationFailureClass::SnapshotDrift,
684 "chunk snapshot changed during generation; retry".to_owned(),
685 );
686 drop(tx);
687 self.persist_vector_regeneration_failure_best_effort(
688 &config.profile,
689 &audit_metadata,
690 &failure,
691 );
692 return Err(failure.to_engine_error());
693 }
694 persist_vector_contract(&tx, &config, &table_name, &identity, &snapshot_hash)?;
695 tx.execute(&format!("DELETE FROM {table_name}"), [])?;
696 let mut stmt = tx.prepare_cached(&format!(
697 "INSERT INTO {table_name} (chunk_id, embedding) VALUES (?1, ?2)"
698 ))?;
699 let mut regenerated_rows = 0usize;
700 for chunk in &apply_chunks {
701 let Some(embedding) = embedding_map.remove(&chunk.chunk_id) else {
702 drop(stmt);
703 drop(tx);
704 let failure = VectorRegenerationFailure::new(
705 VectorRegenerationFailureClass::InvalidEmbedderOutput,
706 format!(
707 "embedder did not produce a vector for chunk '{}'",
708 chunk.chunk_id
709 ),
710 );
711 self.persist_vector_regeneration_failure_best_effort(
712 &config.profile,
713 &audit_metadata,
714 &failure,
715 );
716 return Err(failure.to_engine_error());
717 };
718 stmt.execute(rusqlite::params![chunk.chunk_id.as_str(), embedding])?;
719 regenerated_rows += 1;
720 }
721 drop(stmt);
722 persist_vector_regeneration_event(
723 &tx,
724 "vector_regeneration_apply",
725 &config.profile,
726 &audit_metadata,
727 )?;
728 tx.commit()?;
729
730 Ok(VectorRegenerationReport {
731 profile: config.profile.clone(),
732 table_name,
733 dimension: identity.dimension,
734 total_chunks: chunks.len(),
735 regenerated_rows,
736 contract_persisted: true,
737 notes,
738 })
739 }
740
741 pub(super) fn persist_vector_regeneration_failure_best_effort(
742 &self,
743 profile: &str,
744 metadata: &VectorRegenerationAuditMetadata,
745 failure: &VectorRegenerationFailure,
746 ) {
747 let Ok(conn) = self.connect() else {
748 return;
749 };
750 let failure_metadata = VectorRegenerationAuditMetadata {
751 profile: metadata.profile.clone(),
752 model_identity: metadata.model_identity.clone(),
753 model_version: metadata.model_version.clone(),
754 chunk_count: metadata.chunk_count,
755 snapshot_hash: metadata.snapshot_hash.clone(),
756 failure_class: Some(failure.failure_class_label().to_owned()),
757 };
758 let _ = persist_vector_regeneration_event(
759 &conn,
760 "vector_regeneration_failed",
761 profile,
762 &failure_metadata,
763 );
764 }
765
766 pub fn configure_vec_kind(
780 &self,
781 kind: &str,
782 source: VectorSource,
783 ) -> Result<ConfigureVecOutcome, EngineError> {
784 match source {
785 VectorSource::Chunks => {}
786 }
787 let mut conn = self.connect()?;
788
789 let profile: Option<(i64, i64)> = conn
790 .query_row(
791 "SELECT profile_id, dimensions FROM vector_embedding_profiles WHERE active = 1",
792 [],
793 |row| Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?)),
794 )
795 .optional()?;
796 let (profile_id, dimensions) = profile.ok_or_else(|| {
797 EngineError::InvalidConfig(
798 "no active embedding profile configured; call configure_embedding first".to_owned(),
799 )
800 })?;
801 let dimensions = usize::try_from(dimensions).map_err(|_| {
802 EngineError::Bridge(format!(
803 "invalid embedding profile dimensions: {dimensions}"
804 ))
805 })?;
806
807 let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
808
809 let was_already_enabled: bool = tx
810 .query_row(
811 "SELECT enabled FROM vector_index_schemas WHERE kind = ?1",
812 rusqlite::params![kind],
813 |row| row.get::<_, i64>(0).map(|v| v == 1),
814 )
815 .optional()?
816 .unwrap_or(false);
817
818 tx.execute(
819 "INSERT INTO vector_index_schemas \
820 (kind, enabled, source_mode, source_config_json, state, created_at, updated_at) \
821 VALUES (?1, 1, 'chunks', NULL, 'fresh', unixepoch(), unixepoch()) \
822 ON CONFLICT(kind) DO UPDATE SET \
823 enabled = 1, \
824 source_mode = 'chunks', \
825 source_config_json = NULL, \
826 updated_at = unixepoch()",
827 rusqlite::params![kind],
828 )?;
829
830 self.schema_manager
831 .ensure_vec_kind_profile(&tx, kind, dimensions)?;
832
833 let chunks = collect_kind_chunks(&tx, kind)?;
834 let mut enqueued: usize = 0;
835 {
836 let mut stmt = tx.prepare(
837 "INSERT INTO vector_projection_work \
838 (kind, node_logical_id, chunk_id, canonical_hash, priority, \
839 embedding_profile_id, state, created_at, updated_at) \
840 SELECT ?1, ?2, ?3, ?4, 0, ?5, 'pending', unixepoch(), unixepoch() \
841 WHERE NOT EXISTS ( \
842 SELECT 1 FROM vector_projection_work \
843 WHERE chunk_id = ?3 AND embedding_profile_id = ?5 AND state = 'pending' \
844 )",
845 )?;
846 for chunk in &chunks {
847 let canonical_hash = canonical_chunk_hash(&chunk.chunk_id, &chunk.text_content);
848 let inserted = stmt.execute(rusqlite::params![
849 kind,
850 chunk.node_logical_id.as_str(),
851 chunk.chunk_id.as_str(),
852 canonical_hash,
853 profile_id,
854 ])?;
855 enqueued += inserted;
856 }
857 }
858
859 tx.commit()?;
860
861 Ok(ConfigureVecOutcome {
862 kind: kind.to_owned(),
863 enqueued_backfill_rows: enqueued,
864 was_already_enabled,
865 })
866 }
867
868 pub fn configure_vec_kinds(
879 &self,
880 items: &[(String, VectorSource)],
881 ) -> Result<Vec<ConfigureVecOutcome>, EngineError> {
882 let mut outcomes = Vec::with_capacity(items.len());
883 for (kind, source) in items {
884 outcomes.push(self.configure_vec_kind(kind, *source)?);
885 }
886 Ok(outcomes)
887 }
888
889 pub fn get_vec_index_status(&self, kind: &str) -> Result<VecIndexStatus, EngineError> {
897 let conn = self.connect()?;
898
899 let schema_row: Option<(bool, String, Option<String>, Option<i64>)> = conn
900 .query_row(
901 "SELECT enabled, state, last_error, last_completed_at \
902 FROM vector_index_schemas WHERE kind = ?1",
903 rusqlite::params![kind],
904 |row| {
905 Ok((
906 row.get::<_, i64>(0)? == 1,
907 row.get::<_, String>(1)?,
908 row.get::<_, Option<String>>(2)?,
909 row.get::<_, Option<i64>>(3)?,
910 ))
911 },
912 )
913 .optional()?;
914
915 let Some((enabled, state, last_error, last_completed_at)) = schema_row else {
916 return Ok(VecIndexStatus {
917 kind: kind.to_owned(),
918 enabled: false,
919 state: "unconfigured".to_owned(),
920 pending_incremental: 0,
921 pending_backfill: 0,
922 last_error: None,
923 last_completed_at: None,
924 embedding_identity: None,
925 });
926 };
927
928 let pending_backfill: u64 = conn
929 .query_row(
930 "SELECT count(*) FROM vector_projection_work \
931 WHERE kind = ?1 AND state = 'pending' AND priority < 1000",
932 rusqlite::params![kind],
933 |row| row.get::<_, i64>(0),
934 )
935 .map(i64::cast_unsigned)?;
936
937 let pending_incremental: u64 = conn
938 .query_row(
939 "SELECT count(*) FROM vector_projection_work \
940 WHERE kind = ?1 AND state = 'pending' AND priority >= 1000",
941 rusqlite::params![kind],
942 |row| row.get::<_, i64>(0),
943 )
944 .map(i64::cast_unsigned)?;
945
946 let embedding_identity: Option<String> = conn
947 .query_row(
948 "SELECT model_identity FROM vector_embedding_profiles WHERE active = 1",
949 [],
950 |row| row.get::<_, String>(0),
951 )
952 .optional()?;
953
954 Ok(VecIndexStatus {
955 kind: kind.to_owned(),
956 enabled,
957 state,
958 pending_incremental,
959 pending_backfill,
960 last_error,
961 last_completed_at,
962 embedding_identity,
963 })
964 }
965
966 pub fn rebuild_projections(
969 &self,
970 target: ProjectionTarget,
971 ) -> Result<ProjectionRepairReport, EngineError> {
972 self.projections.rebuild_projections(target)
973 }
974
975 pub fn rebuild_missing_projections(&self) -> Result<ProjectionRepairReport, EngineError> {
978 self.projections.rebuild_missing_projections()
979 }
980
981 #[allow(clippy::too_many_lines)]
1011 pub fn configure_embedding(
1012 &self,
1013 embedder: &dyn QueryEmbedder,
1014 acknowledge_rebuild_impact: bool,
1015 ) -> Result<ConfigureEmbeddingOutcome, EngineError> {
1016 let identity = embedder.identity();
1017 let max_tokens = embedder.max_tokens();
1018 let dimensions = i64::try_from(identity.dimension).map_err(|_| {
1019 EngineError::InvalidConfig(format!(
1020 "embedder dimension {} exceeds i64 range",
1021 identity.dimension
1022 ))
1023 })?;
1024 let max_tokens_i64 = i64::try_from(max_tokens).ok();
1025
1026 let mut conn = self.connect()?;
1027 let tx = conn.transaction_with_behavior(TransactionBehavior::Immediate)?;
1028
1029 let current: Option<(i64, String, String, i64, String)> = tx
1031 .query_row(
1032 "SELECT profile_id, model_identity, COALESCE(model_version, ''), dimensions, \
1033 COALESCE(normalization_policy, '') \
1034 FROM vector_embedding_profiles WHERE active = 1",
1035 [],
1036 |row| {
1037 Ok((
1038 row.get::<_, i64>(0)?,
1039 row.get::<_, String>(1)?,
1040 row.get::<_, String>(2)?,
1041 row.get::<_, i64>(3)?,
1042 row.get::<_, String>(4)?,
1043 ))
1044 },
1045 )
1046 .optional()?;
1047
1048 let incoming_version = identity.model_version.clone();
1049 if let Some((profile_id, current_identity, current_version, current_dim, current_norm)) =
1050 current.clone()
1051 {
1052 if current_identity == identity.model_identity
1053 && current_version == incoming_version
1054 && current_dim == dimensions
1055 && current_norm == identity.normalization_policy
1056 {
1057 tx.commit()?;
1059 return Ok(ConfigureEmbeddingOutcome::Unchanged { profile_id });
1060 }
1061
1062 let affected_kinds: i64 = tx.query_row(
1064 "SELECT COUNT(*) FROM vector_index_schemas WHERE enabled = 1",
1065 [],
1066 |row| row.get::<_, i64>(0),
1067 )?;
1068 let affected = usize::try_from(affected_kinds).unwrap_or(0);
1069 if affected > 0 && !acknowledge_rebuild_impact {
1070 drop(tx);
1072 return Err(EngineError::EmbeddingChangeRequiresAck {
1073 affected_kinds: affected,
1074 });
1075 }
1076
1077 let identity_triple_changed = current_identity != identity.model_identity
1078 || current_version != incoming_version
1079 || current_dim != dimensions;
1080
1081 let new_profile_id = if identity_triple_changed {
1082 tx.execute(
1084 "UPDATE vector_embedding_profiles SET active = 0 WHERE active = 1",
1085 [],
1086 )?;
1087
1088 insert_new_active_profile(
1090 &tx,
1091 &identity.model_identity,
1092 &incoming_version,
1093 dimensions,
1094 &identity.normalization_policy,
1095 max_tokens_i64,
1096 )?
1097 } else {
1098 let normalization_opt: Option<&str> = if identity.normalization_policy.is_empty() {
1104 None
1105 } else {
1106 Some(identity.normalization_policy.as_str())
1107 };
1108 tx.execute(
1109 "UPDATE vector_embedding_profiles \
1110 SET normalization_policy = ?1, max_tokens = ?2 \
1111 WHERE profile_id = ?3",
1112 rusqlite::params![normalization_opt, max_tokens_i64, profile_id],
1113 )?;
1114 profile_id
1115 };
1116
1117 let stale_kinds = if affected > 0 {
1119 tx.execute(
1120 "UPDATE vector_index_schemas \
1121 SET state = 'stale', updated_at = unixepoch() \
1122 WHERE enabled = 1",
1123 [],
1124 )?
1125 } else {
1126 0
1127 };
1128
1129 tx.commit()?;
1130 return Ok(ConfigureEmbeddingOutcome::Replaced {
1131 old_profile_id: profile_id,
1132 new_profile_id,
1133 stale_kinds,
1134 });
1135 }
1136
1137 let new_profile_id = insert_new_active_profile(
1139 &tx,
1140 &identity.model_identity,
1141 &incoming_version,
1142 dimensions,
1143 &identity.normalization_policy,
1144 max_tokens_i64,
1145 )?;
1146 tx.commit()?;
1147 Ok(ConfigureEmbeddingOutcome::Activated {
1148 profile_id: new_profile_id,
1149 })
1150 }
1151
1152 pub fn active_embedding_profile_id(&self) -> Result<Option<i64>, EngineError> {
1158 let conn = self.connect()?;
1159 let id = conn
1160 .query_row(
1161 "SELECT profile_id FROM vector_embedding_profiles WHERE active = 1",
1162 [],
1163 |row| row.get::<_, i64>(0),
1164 )
1165 .optional()?;
1166 Ok(id)
1167 }
1168
1169 pub fn drain_vector_projection(
1178 &self,
1179 embedder: &dyn BatchEmbedder,
1180 timeout: std::time::Duration,
1181 ) -> Result<crate::vector_projection_actor::DrainReport, EngineError> {
1182 let deadline = std::time::Instant::now() + timeout;
1183 let mut report = crate::vector_projection_actor::DrainReport::default();
1184 let writer = self.require_writer()?;
1185 loop {
1186 if std::time::Instant::now() >= deadline {
1187 break;
1188 }
1189 let tick = crate::vector_projection_actor::run_tick(self, &writer, embedder)?;
1190 if tick.embedder_unavailable {
1191 report.embedder_unavailable_ticks += 1;
1192 break;
1193 }
1194 report.incremental_processed += tick.processed_incremental;
1195 report.backfill_processed += tick.processed_backfill;
1196 report.failed += tick.failed;
1197 report.discarded_stale += tick.discarded_stale;
1198 if tick.idle {
1199 break;
1200 }
1201 }
1202 Ok(report)
1203 }
1204
1205 pub fn drain_vector_projection_single_tick(
1211 &self,
1212 embedder: &dyn BatchEmbedder,
1213 ) -> Result<crate::vector_projection_actor::DrainReport, EngineError> {
1214 let writer = self.require_writer()?;
1215 let tick = crate::vector_projection_actor::run_tick(self, &writer, embedder)?;
1216 let mut report = crate::vector_projection_actor::DrainReport::default();
1217 if tick.embedder_unavailable {
1218 report.embedder_unavailable_ticks = 1;
1219 }
1220 report.incremental_processed = tick.processed_incremental;
1221 report.backfill_processed = tick.processed_backfill;
1222 report.failed = tick.failed;
1223 report.discarded_stale = tick.discarded_stale;
1224 Ok(report)
1225 }
1226
1227 fn require_writer(&self) -> Result<std::sync::Arc<crate::WriterActor>, EngineError> {
1228 self.writer.clone().ok_or_else(|| {
1229 EngineError::InvalidConfig(
1230 "drain_vector_projection requires an engine-wired AdminService".to_owned(),
1231 )
1232 })
1233 }
1234
1235 pub fn check_embedding(&self, embedder: &dyn QueryEmbedder) -> Result<(), EngineError> {
1244 match embedder.embed_query("fathomdb embedder health probe") {
1245 Ok(_) => Ok(()),
1246 Err(err) => Err(EngineError::CapabilityMissing(format!(
1247 "embedder probe failed: {err}"
1248 ))),
1249 }
1250 }
1251}
1252
1253#[derive(Clone, Debug, PartialEq, Eq, Serialize)]
1255#[serde(tag = "outcome", rename_all = "snake_case")]
1256pub enum ConfigureEmbeddingOutcome {
1257 Activated {
1259 profile_id: i64,
1261 },
1262 Unchanged {
1265 profile_id: i64,
1267 },
1268 Replaced {
1271 old_profile_id: i64,
1273 new_profile_id: i64,
1275 stale_kinds: usize,
1277 },
1278}
1279
1280fn insert_new_active_profile(
1281 tx: &rusqlite::Transaction<'_>,
1282 model_identity: &str,
1283 model_version: &str,
1284 dimensions: i64,
1285 normalization_policy: &str,
1286 max_tokens: Option<i64>,
1287) -> Result<i64, rusqlite::Error> {
1288 let profile_name = format!("{model_identity}@{model_version}");
1291 let model_version_opt: Option<&str> = if model_version.is_empty() {
1292 None
1293 } else {
1294 Some(model_version)
1295 };
1296 let normalization_opt: Option<&str> = if normalization_policy.is_empty() {
1297 None
1298 } else {
1299 Some(normalization_policy)
1300 };
1301 tx.execute(
1302 "INSERT INTO vector_embedding_profiles \
1303 (profile_name, model_identity, model_version, dimensions, normalization_policy, \
1304 max_tokens, active, activated_at, created_at) \
1305 VALUES (?1, ?2, ?3, ?4, ?5, ?6, 1, unixepoch(), unixepoch())",
1306 rusqlite::params![
1307 profile_name,
1308 model_identity,
1309 model_version_opt,
1310 dimensions,
1311 normalization_opt,
1312 max_tokens,
1313 ],
1314 )?;
1315 Ok(tx.last_insert_rowid())
1316}
1317
1318pub fn load_vector_regeneration_config(
1321 path: impl AsRef<Path>,
1322) -> Result<VectorRegenerationConfig, EngineError> {
1323 let path = path.as_ref();
1324 let raw = std::fs::read_to_string(path)?;
1325 match path.extension().and_then(|ext| ext.to_str()) {
1326 Some("toml") => {
1327 toml::from_str(&raw).map_err(|error| EngineError::Bridge(error.to_string()))
1328 }
1329 Some("json") | None => {
1330 serde_json::from_str(&raw).map_err(|error| EngineError::Bridge(error.to_string()))
1331 }
1332 Some(other) => Err(EngineError::Bridge(format!(
1333 "unsupported vector regeneration config extension: {other}"
1334 ))),
1335 }
1336}
1337
1338fn validate_vector_regeneration_config(
1339 conn: &rusqlite::Connection,
1340 config: &VectorRegenerationConfig,
1341 identity: &QueryEmbedderIdentity,
1342) -> Result<VectorRegenerationConfig, VectorRegenerationFailure> {
1343 let kind = validate_bounded_text("kind", &config.kind, MAX_PROFILE_LEN)?;
1344 let profile = validate_bounded_text("profile", &config.profile, MAX_PROFILE_LEN)?;
1345 if identity.dimension == 0 {
1346 return Err(VectorRegenerationFailure::new(
1347 VectorRegenerationFailureClass::InvalidContract,
1348 "embedder reports dimension 0".to_owned(),
1349 ));
1350 }
1351 let chunking_policy =
1352 validate_bounded_text("chunking_policy", &config.chunking_policy, MAX_POLICY_LEN)?;
1353 let preprocessing_policy = validate_bounded_text(
1354 "preprocessing_policy",
1355 &config.preprocessing_policy,
1356 MAX_POLICY_LEN,
1357 )?;
1358
1359 if let Some(existing_dimension) = current_vector_profile_dimension(conn, &profile)?
1360 && existing_dimension != identity.dimension
1361 {
1362 return Err(VectorRegenerationFailure::new(
1363 VectorRegenerationFailureClass::InvalidContract,
1364 format!(
1365 "embedder dimension {} does not match existing vector profile dimension {}",
1366 identity.dimension, existing_dimension
1367 ),
1368 ));
1369 }
1370
1371 validate_existing_contract_version(conn, &profile)?;
1372
1373 let normalized = VectorRegenerationConfig {
1374 kind,
1375 profile,
1376 chunking_policy,
1377 preprocessing_policy,
1378 };
1379 let serialized = serde_json::to_vec(&normalized).map_err(|error| {
1380 VectorRegenerationFailure::new(
1381 VectorRegenerationFailureClass::InvalidContract,
1382 error.to_string(),
1383 )
1384 })?;
1385 if serialized.len() > MAX_CONTRACT_JSON_BYTES {
1386 return Err(VectorRegenerationFailure::new(
1387 VectorRegenerationFailureClass::InvalidContract,
1388 format!("serialized contract exceeds {MAX_CONTRACT_JSON_BYTES} bytes"),
1389 ));
1390 }
1391
1392 Ok(normalized)
1393}
1394
1395#[allow(clippy::cast_possible_wrap)]
1396fn persist_vector_contract(
1397 conn: &rusqlite::Connection,
1398 config: &VectorRegenerationConfig,
1399 table_name: &str,
1400 identity: &QueryEmbedderIdentity,
1401 snapshot_hash: &str,
1402) -> Result<(), EngineError> {
1403 conn.execute(
1404 r"
1405 INSERT OR REPLACE INTO vector_embedding_contracts (
1406 profile,
1407 table_name,
1408 model_identity,
1409 model_version,
1410 dimension,
1411 normalization_policy,
1412 chunking_policy,
1413 preprocessing_policy,
1414 generator_command_json,
1415 applied_at,
1416 snapshot_hash,
1417 contract_format_version,
1418 updated_at
1419 ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, unixepoch(), ?10, ?11, unixepoch())
1420 ",
1421 rusqlite::params![
1422 config.profile.as_str(),
1423 table_name,
1424 identity.model_identity.as_str(),
1425 identity.model_version.as_str(),
1426 identity.dimension as i64,
1427 identity.normalization_policy.as_str(),
1428 config.chunking_policy.as_str(),
1429 config.preprocessing_policy.as_str(),
1430 "[]",
1431 snapshot_hash,
1432 CURRENT_VECTOR_CONTRACT_FORMAT_VERSION,
1433 ],
1434 )?;
1435 Ok(())
1436}
1437
1438fn persist_vector_regeneration_event(
1439 conn: &rusqlite::Connection,
1440 event_type: &str,
1441 subject: &str,
1442 metadata: &VectorRegenerationAuditMetadata,
1443) -> Result<(), EngineError> {
1444 let metadata_json = serialize_audit_metadata(metadata)?;
1445 conn.execute(
1446 "INSERT INTO provenance_events (id, event_type, subject, metadata_json) VALUES (?1, ?2, ?3, ?4)",
1447 rusqlite::params![new_id(), event_type, subject, metadata_json],
1448 )?;
1449 Ok(())
1450}
1451
1452fn validate_bounded_text(
1453 field: &str,
1454 value: &str,
1455 max_len: usize,
1456) -> Result<String, VectorRegenerationFailure> {
1457 let trimmed = value.trim();
1458 if trimmed.is_empty() {
1459 return Err(VectorRegenerationFailure::new(
1460 VectorRegenerationFailureClass::InvalidContract,
1461 format!("{field} must not be empty"),
1462 ));
1463 }
1464 if trimmed.len() > max_len {
1465 return Err(VectorRegenerationFailure::new(
1466 VectorRegenerationFailureClass::InvalidContract,
1467 format!("{field} exceeds max length {max_len}"),
1468 ));
1469 }
1470 Ok(trimmed.to_owned())
1471}
1472
1473fn current_vector_profile_dimension(
1474 conn: &rusqlite::Connection,
1475 profile: &str,
1476) -> Result<Option<usize>, VectorRegenerationFailure> {
1477 let dimension: Option<i64> = conn
1478 .query_row(
1479 "SELECT dimension FROM vector_profiles WHERE profile = ?1 AND enabled = 1",
1480 [profile],
1481 |row| row.get(0),
1482 )
1483 .optional()
1484 .map_err(|error| {
1485 VectorRegenerationFailure::new(
1486 VectorRegenerationFailureClass::InvalidContract,
1487 error.to_string(),
1488 )
1489 })?;
1490 dimension
1491 .map(|value| {
1492 usize::try_from(value).map_err(|_| {
1493 VectorRegenerationFailure::new(
1494 VectorRegenerationFailureClass::InvalidContract,
1495 format!("stored vector profile dimension is invalid: {value}"),
1496 )
1497 })
1498 })
1499 .transpose()
1500}
1501
1502fn validate_existing_contract_version(
1503 conn: &rusqlite::Connection,
1504 profile: &str,
1505) -> Result<(), VectorRegenerationFailure> {
1506 let version: Option<i64> = conn
1507 .query_row(
1508 "SELECT contract_format_version FROM vector_embedding_contracts WHERE profile = ?1",
1509 [profile],
1510 |row| row.get(0),
1511 )
1512 .optional()
1513 .map_err(|error| {
1514 VectorRegenerationFailure::new(
1515 VectorRegenerationFailureClass::InvalidContract,
1516 error.to_string(),
1517 )
1518 })?;
1519 if let Some(version) = version
1520 && version > CURRENT_VECTOR_CONTRACT_FORMAT_VERSION
1521 {
1522 return Err(VectorRegenerationFailure::new(
1523 VectorRegenerationFailureClass::InvalidContract,
1524 format!(
1525 "persisted contract format version {version} is unsupported; supported version is {CURRENT_VECTOR_CONTRACT_FORMAT_VERSION}"
1526 ),
1527 ));
1528 }
1529 Ok(())
1530}
1531
1532fn serialize_audit_metadata(
1533 metadata: &VectorRegenerationAuditMetadata,
1534) -> Result<String, EngineError> {
1535 let json =
1536 serde_json::to_string(metadata).map_err(|error| EngineError::Bridge(error.to_string()))?;
1537 if json.len() > MAX_AUDIT_METADATA_BYTES {
1538 return Err(VectorRegenerationFailure::new(
1539 VectorRegenerationFailureClass::InvalidContract,
1540 format!("audit metadata exceeds {MAX_AUDIT_METADATA_BYTES} bytes"),
1541 )
1542 .to_engine_error());
1543 }
1544 Ok(json)
1545}
1546
1547pub(super) fn build_regeneration_input(
1548 config: &VectorRegenerationConfig,
1549 identity: &QueryEmbedderIdentity,
1550 chunks: Vec<VectorRegenerationInputChunk>,
1551) -> VectorRegenerationInput {
1552 VectorRegenerationInput {
1553 profile: config.profile.clone(),
1554 table_name: fathomdb_schema::vec_kind_table_name(&config.kind),
1555 model_identity: identity.model_identity.clone(),
1556 model_version: identity.model_version.clone(),
1557 dimension: identity.dimension,
1558 normalization_policy: identity.normalization_policy.clone(),
1559 chunking_policy: config.chunking_policy.clone(),
1560 preprocessing_policy: config.preprocessing_policy.clone(),
1561 chunks,
1562 }
1563}
1564
1565pub(super) fn compute_snapshot_hash(
1566 payload: &VectorRegenerationInput,
1567) -> Result<String, EngineError> {
1568 let bytes =
1569 serde_json::to_vec(payload).map_err(|error| EngineError::Bridge(error.to_string()))?;
1570 let mut hasher = sha2::Sha256::new();
1571 hasher.update(bytes);
1572 Ok(format!("{:x}", hasher.finalize()))
1573}
1574
1575pub(super) fn collect_kind_chunks(
1579 conn: &rusqlite::Connection,
1580 kind: &str,
1581) -> Result<Vec<VectorRegenerationInputChunk>, EngineError> {
1582 let mut stmt = conn.prepare(
1583 r"
1584 SELECT c.id, c.node_logical_id, n.kind, c.text_content, c.byte_start, c.byte_end, n.source_ref, c.created_at
1585 FROM chunks c
1586 JOIN nodes n
1587 ON n.logical_id = c.node_logical_id
1588 AND n.superseded_at IS NULL
1589 WHERE n.kind = ?1
1590 ORDER BY c.created_at, c.id
1591 ",
1592 )?;
1593 let chunks = stmt
1594 .query_map(rusqlite::params![kind], |row| {
1595 Ok(VectorRegenerationInputChunk {
1596 chunk_id: row.get(0)?,
1597 node_logical_id: row.get(1)?,
1598 kind: row.get(2)?,
1599 text_content: row.get(3)?,
1600 byte_start: row.get(4)?,
1601 byte_end: row.get(5)?,
1602 source_ref: row.get(6)?,
1603 created_at: row.get(7)?,
1604 })
1605 })?
1606 .collect::<Result<Vec<_>, _>>()?;
1607 Ok(chunks)
1608}
1609
1610#[must_use]
1612pub(crate) fn canonical_chunk_hash(chunk_id: &str, text: &str) -> String {
1613 let mut hasher = sha2::Sha256::new();
1614 hasher.update(chunk_id.as_bytes());
1615 hasher.update([0u8]);
1616 hasher.update(text.as_bytes());
1617 format!("{:x}", hasher.finalize())
1618}
1619
1620pub(super) fn collect_regeneration_chunks(
1621 conn: &rusqlite::Connection,
1622) -> Result<Vec<VectorRegenerationInputChunk>, EngineError> {
1623 let mut stmt = conn.prepare(
1624 r"
1625 SELECT c.id, c.node_logical_id, n.kind, c.text_content, c.byte_start, c.byte_end, n.source_ref, c.created_at
1626 FROM chunks c
1627 JOIN nodes n
1628 ON n.logical_id = c.node_logical_id
1629 AND n.superseded_at IS NULL
1630 ORDER BY c.created_at, c.id
1631 ",
1632 )?;
1633 let chunks = stmt
1634 .query_map([], |row| {
1635 Ok(VectorRegenerationInputChunk {
1636 chunk_id: row.get(0)?,
1637 node_logical_id: row.get(1)?,
1638 kind: row.get(2)?,
1639 text_content: row.get(3)?,
1640 byte_start: row.get(4)?,
1641 byte_end: row.get(5)?,
1642 source_ref: row.get(6)?,
1643 created_at: row.get(7)?,
1644 })
1645 })?
1646 .collect::<Result<Vec<_>, _>>()?;
1647 Ok(chunks)
1648}