1use chrono::{DateTime, Utc};
4use cortex_core::{
5 validate_summary_spans, ConsumerAdvisory, CrossSessionSalience, OutcomeMemoryRelation,
6 SourceAttestation, SourceAuthority, SummarySpan,
7};
8use rusqlite::{params, OptionalExtension};
9
10use crate::{Pool, StoreError, StoreResult};
11
12const SCHEMA_VERSION_TABLES: &[&str] = &["events", "traces"];
13
14const V2_EXPAND_COLUMNS: &[(&str, &str)] = &[
15 ("events", "source_attestation_json"),
16 ("episodes", "summary_spans_json"),
17 ("memories", "summary_spans_json"),
18 ("memories", "cross_session_use_count"),
19 ("memories", "first_used_at"),
20 ("memories", "last_cross_session_use_at"),
21 ("memories", "last_validation_at"),
22 ("memories", "validation_epoch"),
23 ("memories", "blessed_until"),
24 ("context_packs", "consumer_advisory_json"),
25];
26
27const V2_EXPAND_TABLES: &[&str] = &["memory_session_uses", "outcome_memory_relations"];
28
29const V2_BACKFILL_REQUIRED_COLUMNS: &[(&str, &str)] = &[
30 ("events", "source_attestation_json"),
31 ("episodes", "summary_spans_json"),
32 ("memories", "summary_spans_json"),
33 ("memories", "cross_session_use_count"),
34 ("memories", "validation_epoch"),
35 ("context_packs", "consumer_advisory_json"),
36];
37
38const DEFAULT_SCHEMA_V2_TARGET: u16 = 2;
39
40#[derive(Debug, Clone, Copy)]
41struct RequiredTable {
42 name: &'static str,
43 columns: &'static [&'static str],
44}
45
46const REQUIRED_TABLES: &[RequiredTable] = &[
47 RequiredTable {
48 name: "_migrations",
49 columns: &["name", "applied_at"],
50 },
51 RequiredTable {
52 name: "audit_records",
53 columns: &[
54 "id",
55 "operation",
56 "target_ref",
57 "before_hash",
58 "after_hash",
59 "reason",
60 "actor_json",
61 "source_refs_json",
62 "created_at",
63 ],
64 },
65 RequiredTable {
66 name: "authority_key_timeline",
67 columns: &[
68 "key_id",
69 "principal_id",
70 "state",
71 "effective_at",
72 "reason",
73 "audit_ref",
74 ],
75 },
76 RequiredTable {
77 name: "authority_principal_timeline",
78 columns: &[
79 "principal_id",
80 "trust_tier",
81 "effective_at",
82 "trust_review_due_at",
83 "removed_at",
84 "audit_ref",
85 ],
86 },
87 RequiredTable {
88 name: "context_packs",
89 columns: &[
90 "id",
91 "task",
92 "pack_json",
93 "selection_audit",
94 "created_at",
95 "consumer_advisory_json",
98 ],
99 },
100 RequiredTable {
101 name: "contradictions",
102 columns: &[
103 "id",
104 "left_ref",
105 "right_ref",
106 "contradiction_type",
107 "status",
108 "interpretation",
109 "created_at",
110 "updated_at",
111 ],
112 },
113 RequiredTable {
114 name: "doctrine",
115 columns: &[
116 "id",
117 "source_principle",
118 "rule",
119 "force",
120 "promotion_reason",
121 "promoted_by_json",
122 "created_at",
123 ],
124 },
125 RequiredTable {
126 name: "episodes",
127 columns: &[
128 "id",
129 "trace_id",
130 "source_events_json",
131 "summary",
132 "domains_json",
133 "entities_json",
134 "candidate_meaning",
135 "extracted_by_json",
136 "confidence",
137 "status",
138 "summary_spans_json",
140 ],
141 },
142 RequiredTable {
143 name: "events",
144 columns: &[
145 "id",
146 "schema_version",
147 "observed_at",
148 "recorded_at",
149 "source_json",
150 "event_type",
151 "trace_id",
152 "session_id",
153 "domain_tags_json",
154 "payload_json",
155 "payload_hash",
156 "prev_event_hash",
157 "event_hash",
158 "source_attestation_json",
161 ],
162 },
163 RequiredTable {
164 name: "memories",
165 columns: &[
166 "id",
167 "memory_type",
168 "status",
169 "claim",
170 "source_episodes_json",
171 "source_events_json",
172 "domains_json",
173 "salience_json",
174 "confidence",
175 "authority",
176 "applies_when_json",
177 "does_not_apply_when_json",
178 "created_at",
179 "updated_at",
180 "summary_spans_json",
183 "cross_session_use_count",
184 "first_used_at",
185 "last_cross_session_use_at",
186 "last_validation_at",
187 "validation_epoch",
188 "blessed_until",
189 ],
190 },
191 RequiredTable {
192 name: "memory_session_uses",
193 columns: &[
195 "memory_id",
196 "session_id",
197 "first_used_at",
198 "last_used_at",
199 "use_count",
200 ],
201 },
202 RequiredTable {
203 name: "outcome_memory_relations",
204 columns: &[
209 "outcome_ref",
210 "memory_id",
211 "relation",
212 "recorded_at",
213 "source_event_id",
214 "validation_scope",
215 "validating_principal_id",
216 "evidence_ref",
217 ],
218 },
219 RequiredTable {
220 name: "principles",
221 columns: &[
222 "id",
223 "statement",
224 "status",
225 "supporting_memories_json",
226 "contradicting_memories_json",
227 "domains_observed_json",
228 "applies_when_json",
229 "does_not_apply_when_json",
230 "confidence",
231 "validation",
232 "brightness",
233 "created_by_json",
234 "created_at",
235 "updated_at",
236 ],
237 },
238 RequiredTable {
239 name: "trace_events",
240 columns: &["trace_id", "event_id", "ordinal"],
241 },
242 RequiredTable {
243 name: "traces",
244 columns: &[
245 "id",
246 "schema_version",
247 "opened_at",
248 "closed_at",
249 "trace_type",
250 "status",
251 ],
252 },
253];
254
255#[derive(Debug, Clone, PartialEq, Eq)]
257pub struct SchemaVersionReport {
258 pub expected: u16,
260 pub checked_tables: Vec<&'static str>,
262 pub failures: Vec<SchemaVersionFailure>,
264}
265
266impl SchemaVersionReport {
267 #[must_use]
269 pub fn is_ok(&self) -> bool {
270 self.failures.is_empty()
271 }
272}
273
274#[allow(missing_docs)]
276#[derive(Debug, Clone, PartialEq, Eq)]
277pub enum SchemaVersionFailure {
278 MissingTable { table: &'static str },
280 MissingColumn {
282 table: &'static str,
283 column: &'static str,
284 },
285 UnknownMigration { name: String },
287 Mismatch {
289 table: &'static str,
290 row_id: String,
291 expected: u16,
292 actual: u16,
293 },
294 IllegalIntermediateShape {
296 invariant: &'static str,
297 detail: String,
298 },
299}
300
301#[derive(Debug, Clone, Default, PartialEq, Eq)]
304pub struct RollbackForkState {
305 pub schema_version: Option<u16>,
307 pub fork_marker: Option<ForkMarker>,
309}
310
311#[derive(Debug, Clone, PartialEq, Eq)]
313pub struct ForkMarker {
314 pub marker: String,
316}
317
318impl ForkMarker {
319 #[must_use]
321 pub fn new(marker: impl Into<String>) -> Self {
322 Self {
323 marker: marker.into(),
324 }
325 }
326}
327
328#[derive(Debug, Clone, PartialEq, Eq)]
330pub struct RollbackForkReport {
331 pub failures: Vec<RollbackForkFailure>,
333}
334
335impl RollbackForkReport {
336 #[must_use]
338 pub fn is_ok(&self) -> bool {
339 self.failures.is_empty()
340 }
341}
342
343#[allow(missing_docs)]
345#[derive(Debug, Clone, PartialEq, Eq)]
346pub enum RollbackForkFailure {
347 SchemaVersionRollback { current: u16, candidate: u16 },
349 ForkMarkerPresent {
351 location: RollbackForkStateLocation,
352 marker: String,
353 },
354}
355
356impl RollbackForkFailure {
357 #[must_use]
359 pub fn invariant(&self) -> &'static str {
360 match self {
361 Self::SchemaVersionRollback { .. } => "rollback.schema_version.not_decreased",
362 Self::ForkMarkerPresent { .. } => "fork.marker.absent",
363 }
364 }
365
366 #[must_use]
368 pub fn detail(&self) -> String {
369 match self {
370 Self::SchemaVersionRollback { current, candidate } => format!(
371 "candidate schema_version {candidate} is lower than current schema_version {current}"
372 ),
373 Self::ForkMarkerPresent { location, marker } => {
374 format!("{location} state contains fork marker {marker}")
375 }
376 }
377 }
378}
379
380#[derive(Debug, Clone, Copy, PartialEq, Eq)]
382pub enum RollbackForkStateLocation {
383 Current,
385 Candidate,
387}
388
389impl std::fmt::Display for RollbackForkStateLocation {
390 fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
391 match self {
392 Self::Current => formatter.write_str("current"),
393 Self::Candidate => formatter.write_str("candidate"),
394 }
395 }
396}
397
398impl SchemaVersionFailure {
399 #[must_use]
401 pub fn invariant(&self) -> String {
402 match self {
403 Self::MissingTable { table } => format!("schema_shape.{table}.exists"),
404 Self::MissingColumn { table, column } => {
405 format!("schema_shape.{table}.{column}.exists")
406 }
407 Self::UnknownMigration { .. } => "schema_migration.known_to_code".into(),
408 Self::Mismatch { table, .. } => {
409 format!("schema_version.{table}.matches_code")
410 }
411 Self::IllegalIntermediateShape { invariant, .. } => (*invariant).into(),
412 }
413 }
414
415 #[must_use]
417 pub fn detail(&self) -> String {
418 match self {
419 Self::MissingTable { table } => {
420 format!("required table {table} is missing")
421 }
422 Self::MissingColumn { table, column } => {
423 format!("table {table} is missing required column {column}")
424 }
425 Self::UnknownMigration { name } => {
426 format!("migration {name} is unknown to this binary")
427 }
428 Self::Mismatch {
429 table,
430 row_id,
431 expected,
432 actual,
433 } => format!(
434 "table {table} row {row_id} has schema_version {actual}; expected {expected}"
435 ),
436 Self::IllegalIntermediateShape { detail, .. } => detail.clone(),
437 }
438 }
439}
440
441#[derive(Debug, Clone, Copy, PartialEq, Eq)]
452pub struct PreV2BackupRowCounts {
453 pub events: u64,
455 pub traces: u64,
457 pub episodes: u64,
459 pub memories: u64,
461}
462
463pub const SCHEMA_V1_TO_V2_EVENT_BOUNDARY_DELTA: u64 = 1;
468
469#[derive(Debug, Clone, PartialEq, Eq)]
471pub struct PostMigrateCountMismatchFailure {
472 pub table: &'static str,
474 pub expected: u64,
476 pub actual: u64,
478 pub invariant: &'static str,
480}
481
482impl PostMigrateCountMismatchFailure {
483 #[must_use]
485 pub fn invariant(&self) -> &'static str {
486 self.invariant
487 }
488
489 #[must_use]
491 pub fn detail(&self) -> String {
492 format!(
493 "table {table} has {actual} rows after migrate; backup manifest recorded {expected} rows before migrate",
494 table = self.table,
495 actual = self.actual,
496 expected = self.expected,
497 )
498 }
499}
500
501pub fn verify_post_migrate_row_counts(
509 pool: &Pool,
510 pre: &PreV2BackupRowCounts,
511) -> StoreResult<Vec<PostMigrateCountMismatchFailure>> {
512 let observed = PreV2BackupRowCounts {
513 events: read_table_count(pool, "events")?,
514 traces: read_table_count(pool, "traces")?,
515 episodes: read_table_count(pool, "episodes")?,
516 memories: read_table_count(pool, "memories")?,
517 };
518
519 let mut failures = Vec::new();
520 let expected_events_after = match pre.events.checked_add(SCHEMA_V1_TO_V2_EVENT_BOUNDARY_DELTA) {
527 Some(value) => value,
528 None => {
529 return Err(StoreError::RowCountCheckedAddOverflow {
530 table: "events",
531 pre: pre.events,
532 delta: SCHEMA_V1_TO_V2_EVENT_BOUNDARY_DELTA,
533 });
534 }
535 };
536 if observed.events != expected_events_after {
537 failures.push(PostMigrateCountMismatchFailure {
538 table: "events",
539 expected: expected_events_after,
540 actual: observed.events,
541 invariant: "schema_v2_post_migrate.row_count.events.matches_pre_plus_boundary",
542 });
543 }
544 for (table, expected, actual, invariant) in [
545 (
546 "traces",
547 pre.traces,
548 observed.traces,
549 "schema_v2_post_migrate.row_count.traces.unchanged",
550 ),
551 (
552 "episodes",
553 pre.episodes,
554 observed.episodes,
555 "schema_v2_post_migrate.row_count.episodes.unchanged",
556 ),
557 (
558 "memories",
559 pre.memories,
560 observed.memories,
561 "schema_v2_post_migrate.row_count.memories.unchanged",
562 ),
563 ] {
564 if expected != actual {
565 failures.push(PostMigrateCountMismatchFailure {
566 table,
567 expected,
568 actual,
569 invariant,
570 });
571 }
572 }
573 Ok(failures)
574}
575
576fn read_table_count(pool: &Pool, table: &'static str) -> StoreResult<u64> {
577 let sql = format!("SELECT COUNT(*) FROM {table};");
578 pool.query_row(&sql, [], |row| row.get(0))
579 .map_err(Into::into)
580}
581
582#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
591pub struct PostV2RowPopulation {
592 pub events_post_v2: u64,
596 pub traces_post_v2: u64,
598}
599
600impl PostV2RowPopulation {
601 #[must_use]
604 pub fn is_empty(&self) -> bool {
605 self.events_post_v2 == 0 && self.traces_post_v2 == 0
606 }
607
608 #[must_use]
611 pub fn total(&self) -> u64 {
612 self.events_post_v2.saturating_add(self.traces_post_v2)
613 }
614}
615
616pub fn count_post_v2_rows_outside_boundary(pool: &Pool) -> StoreResult<PostV2RowPopulation> {
634 const BOUNDARY_PAYLOAD_KIND: &str = "schema_migration.v1_to_v2";
635 let events_post_v2: u64 = pool.query_row(
636 "SELECT COUNT(*) FROM events \
637 WHERE schema_version >= 2 \
638 AND (json_extract(payload_json, '$.kind') IS NULL \
639 OR json_extract(payload_json, '$.kind') != ?1);",
640 params![BOUNDARY_PAYLOAD_KIND],
641 |row| row.get(0),
642 )?;
643 let traces_post_v2: u64 = pool.query_row(
644 "SELECT COUNT(*) FROM traces WHERE schema_version >= 2;",
645 [],
646 |row| row.get(0),
647 )?;
648 Ok(PostV2RowPopulation {
649 events_post_v2,
650 traces_post_v2,
651 })
652}
653
654#[must_use]
661pub fn verify_rollback_fork_refusal(
662 current: &RollbackForkState,
663 candidate: &RollbackForkState,
664) -> RollbackForkReport {
665 let mut failures = Vec::new();
666
667 if let (Some(current), Some(candidate)) = (current.schema_version, candidate.schema_version) {
668 if candidate < current {
669 failures.push(RollbackForkFailure::SchemaVersionRollback { current, candidate });
670 }
671 }
672
673 if let Some(marker) = ¤t.fork_marker {
674 failures.push(RollbackForkFailure::ForkMarkerPresent {
675 location: RollbackForkStateLocation::Current,
676 marker: marker.marker.clone(),
677 });
678 }
679
680 if let Some(marker) = &candidate.fork_marker {
681 failures.push(RollbackForkFailure::ForkMarkerPresent {
682 location: RollbackForkStateLocation::Candidate,
683 marker: marker.marker.clone(),
684 });
685 }
686
687 RollbackForkReport { failures }
688}
689
690pub fn verify_schema_version(pool: &Pool, expected: u16) -> StoreResult<SchemaVersionReport> {
692 let mut checked_tables = Vec::new();
693 let mut failures = Vec::new();
694
695 for table in REQUIRED_TABLES {
696 checked_tables.push(table.name);
697 if !has_table(pool, table.name)? {
698 failures.push(SchemaVersionFailure::MissingTable { table: table.name });
699 continue;
700 }
701
702 for column in table.columns {
703 if !has_column(pool, table.name, column)? {
704 failures.push(SchemaVersionFailure::MissingColumn {
705 table: table.name,
706 column,
707 });
708 }
709 }
710 }
711
712 if has_table(pool, "_migrations")? && has_column(pool, "_migrations", "name")? {
724 let known = crate::migrate::known_migration_names();
725 let mut stmt = pool.prepare("SELECT name FROM _migrations ORDER BY name;")?;
726 let rows = stmt.query_map([], |row| row.get::<_, String>(0))?;
727 for row in rows {
728 let name = row?;
729 if !known.iter().any(|known_name| *known_name == name) {
730 failures.push(SchemaVersionFailure::UnknownMigration { name });
731 }
732 }
733 }
734
735 for table in SCHEMA_VERSION_TABLES {
742 if !has_column(pool, table, "schema_version")? {
743 continue;
744 }
745
746 let sql = format!(
747 "SELECT id, schema_version FROM {table} \
748 WHERE schema_version > ?1 \
749 ORDER BY id;"
750 );
751 let mut stmt = pool.prepare(&sql)?;
752 let mismatches = stmt.query_map(params![expected], |row| {
753 Ok(SchemaVersionFailure::Mismatch {
754 table,
755 row_id: row.get(0)?,
756 expected,
757 actual: row.get(1)?,
758 })
759 })?;
760
761 for mismatch in mismatches {
762 failures.push(mismatch?);
763 }
764 }
765
766 Ok(SchemaVersionReport {
767 expected,
768 checked_tables,
769 failures,
770 })
771}
772
773pub fn verify_schema_v2_expand_shape(pool: &Pool) -> StoreResult<Vec<SchemaVersionFailure>> {
781 let mut failures = Vec::new();
782 let mut present_artifacts = 0;
783
784 for (table, column) in V2_EXPAND_COLUMNS {
785 if has_column(pool, table, column)? {
786 present_artifacts += 1;
787 }
788 }
789 for table in V2_EXPAND_TABLES {
790 if has_table(pool, table)? {
791 present_artifacts += 1;
792 }
793 }
794
795 if present_artifacts == 0 {
796 return Ok(failures);
797 }
798
799 for (table, column) in V2_EXPAND_COLUMNS {
800 if !has_column(pool, table, column)? {
801 failures.push(SchemaVersionFailure::IllegalIntermediateShape {
802 invariant: "schema_v2_expand_shape.complete",
803 detail: format!(
804 "partial schema v2 expand shape is missing column {table}.{column}"
805 ),
806 });
807 }
808 }
809 for table in V2_EXPAND_TABLES {
810 if !has_table(pool, table)? {
811 failures.push(SchemaVersionFailure::IllegalIntermediateShape {
812 invariant: "schema_v2_expand_shape.complete",
813 detail: format!("partial schema v2 expand shape is missing table {table}"),
814 });
815 }
816 }
817
818 if !failures.is_empty() {
819 return Ok(failures);
820 }
821
822 for (table, column) in V2_BACKFILL_REQUIRED_COLUMNS {
823 let null_rows = count_null_column_rows(pool, table, column)?;
824 if null_rows > 0 {
825 failures.push(SchemaVersionFailure::IllegalIntermediateShape {
826 invariant: "schema_v2_expand_backfill.complete",
827 detail: format!(
828 "schema v2 expand backfill left {null_rows} {table}.{column} rows unset"
829 ),
830 });
831 }
832 }
833
834 Ok(failures)
835}
836
837pub fn verify_schema_v2_default_persistence_readiness(
843 pool: &Pool,
844) -> StoreResult<Vec<SchemaVersionFailure>> {
845 let mut failures = verify_schema_version(pool, DEFAULT_SCHEMA_V2_TARGET)?.failures;
846 let mut shape_complete = true;
847
848 for (table, column) in V2_EXPAND_COLUMNS {
849 if !has_column(pool, table, column)? {
850 shape_complete = false;
851 failures.push(SchemaVersionFailure::IllegalIntermediateShape {
852 invariant: "schema_v2_default_persistence.s2_9_shape.complete",
853 detail: format!("default schema v2 persistence is missing column {table}.{column}"),
854 });
855 }
856 }
857 for table in V2_EXPAND_TABLES {
858 if !has_table(pool, table)? {
859 shape_complete = false;
860 failures.push(SchemaVersionFailure::IllegalIntermediateShape {
861 invariant: "schema_v2_default_persistence.s2_9_shape.complete",
862 detail: format!("default schema v2 persistence is missing table {table}"),
863 });
864 }
865 }
866
867 if !shape_complete {
868 return Ok(failures);
869 }
870
871 failures.extend(verify_schema_v2_expand_shape(pool)?);
872 failures.extend(verify_v2_source_attestations(pool)?);
873 failures.extend(verify_v2_episode_summary_spans(pool)?);
874 failures.extend(verify_v2_memory_summary_spans(pool)?);
875 failures.extend(verify_v2_context_pack_advisories(pool)?);
876 failures.extend(verify_v2_cross_session_salience(pool)?);
877 failures.extend(verify_v2_memory_session_uses(pool)?);
878 failures.extend(verify_v2_outcome_memory_relations(pool)?);
879
880 Ok(failures)
881}
882
883fn verify_v2_source_attestations(pool: &Pool) -> StoreResult<Vec<SchemaVersionFailure>> {
884 let mut failures = Vec::new();
885 let mut stmt = pool.prepare("SELECT id, source_attestation_json FROM events ORDER BY id;")?;
886 let rows = stmt.query_map([], |row| {
887 Ok((row.get::<_, String>(0)?, row.get::<_, Option<String>>(1)?))
888 })?;
889
890 for row in rows {
891 let (id, raw) = row?;
892 let Some(raw) = raw else {
893 failures.push(SchemaVersionFailure::IllegalIntermediateShape {
894 invariant: "schema_v2_default_persistence.source_attestation.present",
895 detail: format!("events row {id} has unset source_attestation_json"),
896 });
897 continue;
898 };
899 if let Err(err) = serde_json::from_str::<SourceAttestation>(&raw) {
900 failures.push(SchemaVersionFailure::IllegalIntermediateShape {
901 invariant: "schema_v2_default_persistence.source_attestation.valid",
902 detail: format!(
903 "events row {id} source_attestation_json is not a valid SourceAttestation: {err}"
904 ),
905 });
906 }
907 }
908
909 Ok(failures)
910}
911
912fn verify_v2_episode_summary_spans(pool: &Pool) -> StoreResult<Vec<SchemaVersionFailure>> {
913 let mut failures = Vec::new();
914 let mut stmt =
915 pool.prepare("SELECT id, summary, summary_spans_json FROM episodes ORDER BY id;")?;
916 let rows = stmt.query_map([], |row| {
917 Ok((
918 row.get::<_, String>(0)?,
919 row.get::<_, String>(1)?,
920 row.get::<_, Option<String>>(2)?,
921 ))
922 })?;
923
924 for row in rows {
925 let (id, summary, raw) = row?;
926 let Some(raw) = raw else {
927 continue;
933 };
934 match serde_json::from_str::<Vec<SummarySpan>>(&raw) {
935 Ok(spans) => {
936 if let Err(err) =
937 validate_summary_spans(&summary, &spans, |_| SourceAuthority::Derived)
938 {
939 failures.push(SchemaVersionFailure::IllegalIntermediateShape {
940 invariant: "schema_v2_default_persistence.summary_spans.valid",
941 detail: format!(
942 "episodes row {id} summary_spans_json failed {}",
943 err.invariant()
944 ),
945 });
946 }
947 }
948 Err(err) => failures.push(SchemaVersionFailure::IllegalIntermediateShape {
949 invariant: "schema_v2_default_persistence.summary_spans.valid",
950 detail: format!(
951 "episodes row {id} summary_spans_json is not valid SummarySpan JSON: {err}"
952 ),
953 }),
954 }
955 }
956
957 Ok(failures)
958}
959
960fn verify_v2_memory_summary_spans(pool: &Pool) -> StoreResult<Vec<SchemaVersionFailure>> {
961 let mut failures = Vec::new();
962 let mut stmt = pool.prepare("SELECT id, summary_spans_json FROM memories ORDER BY id;")?;
963 let rows = stmt.query_map([], |row| {
964 Ok((row.get::<_, String>(0)?, row.get::<_, Option<String>>(1)?))
965 })?;
966
967 for row in rows {
968 let (id, raw) = row?;
969 let Some(raw) = raw else {
972 continue;
973 };
974 if let Err(err) = serde_json::from_str::<Vec<SummarySpan>>(&raw) {
975 failures.push(SchemaVersionFailure::IllegalIntermediateShape {
976 invariant: "schema_v2_default_persistence.summary_spans.valid",
977 detail: format!(
978 "memories row {id} summary_spans_json is not valid SummarySpan JSON: {err}"
979 ),
980 });
981 }
982 }
983
984 Ok(failures)
985}
986
987fn verify_v2_context_pack_advisories(pool: &Pool) -> StoreResult<Vec<SchemaVersionFailure>> {
988 let mut failures = Vec::new();
989 let mut stmt =
990 pool.prepare("SELECT id, consumer_advisory_json FROM context_packs ORDER BY id;")?;
991 let rows = stmt.query_map([], |row| {
992 Ok((row.get::<_, String>(0)?, row.get::<_, Option<String>>(1)?))
993 })?;
994
995 for row in rows {
996 let (id, raw) = row?;
997 let Some(raw) = raw else {
999 continue;
1000 };
1001 if let Err(err) = serde_json::from_str::<ConsumerAdvisory>(&raw) {
1002 failures.push(SchemaVersionFailure::IllegalIntermediateShape {
1003 invariant: "schema_v2_default_persistence.context_pack_advisory.valid",
1004 detail: format!(
1005 "context_packs row {id} consumer_advisory_json is not a valid ConsumerAdvisory: {err}"
1006 ),
1007 });
1008 }
1009 }
1010
1011 Ok(failures)
1012}
1013
1014fn verify_v2_cross_session_salience(pool: &Pool) -> StoreResult<Vec<SchemaVersionFailure>> {
1015 let mut failures = Vec::new();
1016 let mut stmt = pool.prepare(
1017 "SELECT id, cross_session_use_count, first_used_at, last_cross_session_use_at,
1018 last_validation_at, validation_epoch, blessed_until
1019 FROM memories
1020 ORDER BY id;",
1021 )?;
1022 let rows = stmt.query_map([], |row| {
1023 Ok((
1024 row.get::<_, String>(0)?,
1025 row.get::<_, Option<u32>>(1)?,
1026 row.get::<_, Option<String>>(2)?,
1027 row.get::<_, Option<String>>(3)?,
1028 row.get::<_, Option<String>>(4)?,
1029 row.get::<_, Option<u32>>(5)?,
1030 row.get::<_, Option<String>>(6)?,
1031 ))
1032 })?;
1033
1034 for row in rows {
1035 let (
1036 id,
1037 cross_session_use_count,
1038 first_used_at,
1039 last_cross_session_use_at,
1040 last_validation_at,
1041 validation_epoch,
1042 blessed_until,
1043 ) = row?;
1044
1045 let Some(cross_session_use_count) = cross_session_use_count else {
1046 failures.push(v2_salience_failure(
1047 id,
1048 "cross_session_use_count is missing",
1049 ));
1050 continue;
1051 };
1052 let Some(validation_epoch) = validation_epoch else {
1053 failures.push(v2_salience_failure(id, "validation_epoch is missing"));
1054 continue;
1055 };
1056
1057 let first_used_at =
1058 parse_optional_v2_time(&id, "first_used_at", first_used_at, &mut failures);
1059 let last_cross_session_use_at = parse_optional_v2_time(
1060 &id,
1061 "last_cross_session_use_at",
1062 last_cross_session_use_at,
1063 &mut failures,
1064 );
1065 let last_validation_at =
1066 parse_optional_v2_time(&id, "last_validation_at", last_validation_at, &mut failures);
1067 let blessed_until =
1068 parse_optional_v2_time(&id, "blessed_until", blessed_until, &mut failures);
1069
1070 if failures.iter().any(|failure| match failure {
1071 SchemaVersionFailure::IllegalIntermediateShape { detail, .. } => {
1072 detail.starts_with(&format!("memories row {id} "))
1073 }
1074 _ => false,
1075 }) {
1076 continue;
1077 }
1078
1079 let _state = CrossSessionSalience {
1080 cross_session_use_count,
1081 first_used_at,
1082 last_cross_session_use_at,
1083 last_validation_at,
1084 validation_epoch,
1085 blessed_until,
1086 };
1087 }
1088
1089 Ok(failures)
1090}
1091
1092fn parse_optional_v2_time(
1093 row_id: &str,
1094 column: &'static str,
1095 raw: Option<String>,
1096 failures: &mut Vec<SchemaVersionFailure>,
1097) -> Option<DateTime<Utc>> {
1098 raw.and_then(|value| match value.parse::<DateTime<Utc>>() {
1099 Ok(parsed) => Some(parsed),
1100 Err(err) => {
1101 failures.push(v2_salience_failure(
1102 row_id.to_string(),
1103 format!("{column} is not RFC3339 UTC datetime: {err}"),
1104 ));
1105 None
1106 }
1107 })
1108}
1109
1110fn v2_salience_failure(
1111 row_id: impl Into<String>,
1112 detail: impl Into<String>,
1113) -> SchemaVersionFailure {
1114 SchemaVersionFailure::IllegalIntermediateShape {
1115 invariant: "schema_v2_default_persistence.cross_session_salience.valid",
1116 detail: format!("memories row {} {}", row_id.into(), detail.into()),
1117 }
1118}
1119
1120fn verify_v2_memory_session_uses(pool: &Pool) -> StoreResult<Vec<SchemaVersionFailure>> {
1121 let mut failures = Vec::new();
1122 let mut stmt = pool.prepare(
1123 "SELECT memory_id, session_id, first_used_at, last_used_at, use_count
1124 FROM memory_session_uses
1125 ORDER BY memory_id, session_id;",
1126 )?;
1127 let rows = stmt.query_map([], |row| {
1128 Ok((
1129 row.get::<_, String>(0)?,
1130 row.get::<_, String>(1)?,
1131 row.get::<_, String>(2)?,
1132 row.get::<_, String>(3)?,
1133 row.get::<_, u32>(4)?,
1134 ))
1135 })?;
1136
1137 for row in rows {
1138 let (memory_id, session_id, first_used_at, last_used_at, use_count) = row?;
1139 let row_id = format!("{memory_id}/{session_id}");
1140 if memory_id.trim().is_empty() {
1141 failures.push(v2_memory_session_use_failure(&row_id, "memory_id is empty"));
1142 } else if !row_exists(pool, "memories", &memory_id)? {
1143 failures.push(v2_memory_session_use_failure(
1144 &row_id,
1145 "memory_id does not reference an existing memory",
1146 ));
1147 }
1148 if session_id.trim().is_empty() {
1149 failures.push(v2_memory_session_use_failure(
1150 &row_id,
1151 "session_id is empty",
1152 ));
1153 }
1154 if use_count == 0 {
1155 failures.push(v2_memory_session_use_failure(&row_id, "use_count is zero"));
1156 }
1157
1158 let first_used_at = parse_required_v2_time(
1159 "schema_v2_default_persistence.memory_session_uses.valid",
1160 &row_id,
1161 "first_used_at",
1162 &first_used_at,
1163 &mut failures,
1164 );
1165 let last_used_at = parse_required_v2_time(
1166 "schema_v2_default_persistence.memory_session_uses.valid",
1167 &row_id,
1168 "last_used_at",
1169 &last_used_at,
1170 &mut failures,
1171 );
1172 if let (Some(first), Some(last)) = (first_used_at, last_used_at) {
1173 if last < first {
1174 failures.push(v2_memory_session_use_failure(
1175 &row_id,
1176 "last_used_at is earlier than first_used_at",
1177 ));
1178 }
1179 }
1180 }
1181
1182 Ok(failures)
1183}
1184
1185fn verify_v2_outcome_memory_relations(pool: &Pool) -> StoreResult<Vec<SchemaVersionFailure>> {
1186 let mut failures = Vec::new();
1187 let mut stmt = pool.prepare(
1188 "SELECT outcome_ref, memory_id, relation, recorded_at, source_event_id,
1189 validation_scope, validating_principal_id, evidence_ref
1190 FROM outcome_memory_relations
1191 ORDER BY outcome_ref, memory_id, relation;",
1192 )?;
1193 let rows = stmt.query_map([], |row| {
1194 Ok((
1195 row.get::<_, String>(0)?,
1196 row.get::<_, String>(1)?,
1197 row.get::<_, String>(2)?,
1198 row.get::<_, String>(3)?,
1199 row.get::<_, Option<String>>(4)?,
1200 row.get::<_, Option<String>>(5)?,
1201 row.get::<_, Option<String>>(6)?,
1202 row.get::<_, Option<String>>(7)?,
1203 ))
1204 })?;
1205
1206 for row in rows {
1207 let (
1208 outcome_ref,
1209 memory_id,
1210 relation,
1211 recorded_at,
1212 source_event_id,
1213 validation_scope,
1214 validating_principal_id,
1215 evidence_ref,
1216 ) = row?;
1217 let row_id = format!("{outcome_ref}/{memory_id}/{relation}");
1218 if outcome_ref.trim().is_empty() {
1219 failures.push(v2_outcome_memory_relation_failure(
1220 &row_id,
1221 "outcome_ref is empty",
1222 ));
1223 }
1224 if memory_id.trim().is_empty() {
1225 failures.push(v2_outcome_memory_relation_failure(
1226 &row_id,
1227 "memory_id is empty",
1228 ));
1229 } else if !row_exists(pool, "memories", &memory_id)? {
1230 failures.push(v2_outcome_memory_relation_failure(
1231 &row_id,
1232 "memory_id does not reference an existing memory",
1233 ));
1234 }
1235 let parsed_relation =
1236 serde_json::from_value::<OutcomeMemoryRelation>(serde_json::json!(relation));
1237 if let Err(err) = &parsed_relation {
1238 failures.push(v2_outcome_memory_relation_failure(
1239 &row_id,
1240 format!("relation is not a valid OutcomeMemoryRelation: {err}"),
1241 ));
1242 }
1243 parse_required_v2_time(
1244 "schema_v2_default_persistence.outcome_memory_relations.valid",
1245 &row_id,
1246 "recorded_at",
1247 &recorded_at,
1248 &mut failures,
1249 );
1250 if let Some(source_event_id) = source_event_id {
1251 if source_event_id.trim().is_empty() {
1252 failures.push(v2_outcome_memory_relation_failure(
1253 &row_id,
1254 "source_event_id is empty",
1255 ));
1256 } else if !row_exists(pool, "events", &source_event_id)? {
1257 failures.push(v2_outcome_memory_relation_failure(
1258 &row_id,
1259 "source_event_id does not reference an existing event",
1260 ));
1261 }
1262 }
1263 if let Ok(parsed) = parsed_relation {
1269 if parsed.advances_validation() {
1270 if validation_scope.is_none() {
1271 failures.push(v2_outcome_memory_relation_failure(
1272 &row_id,
1273 "validated relation missing validation_scope",
1274 ));
1275 }
1276 if validating_principal_id.is_none() {
1277 failures.push(v2_outcome_memory_relation_failure(
1278 &row_id,
1279 "validated relation missing validating_principal_id",
1280 ));
1281 }
1282 if evidence_ref.is_none() {
1283 failures.push(v2_outcome_memory_relation_failure(
1284 &row_id,
1285 "validated relation missing evidence_ref",
1286 ));
1287 }
1288 } else {
1289 if validation_scope.is_some() {
1290 failures.push(v2_outcome_memory_relation_failure(
1291 &row_id,
1292 "non-validation relation must not carry validation_scope",
1293 ));
1294 }
1295 if validating_principal_id.is_some() {
1296 failures.push(v2_outcome_memory_relation_failure(
1297 &row_id,
1298 "non-validation relation must not carry validating_principal_id",
1299 ));
1300 }
1301 if evidence_ref.is_some() {
1302 failures.push(v2_outcome_memory_relation_failure(
1303 &row_id,
1304 "non-validation relation must not carry evidence_ref",
1305 ));
1306 }
1307 }
1308 }
1309 }
1310
1311 Ok(failures)
1312}
1313
1314fn parse_required_v2_time(
1315 invariant: &'static str,
1316 row_id: &str,
1317 column: &'static str,
1318 raw: &str,
1319 failures: &mut Vec<SchemaVersionFailure>,
1320) -> Option<DateTime<Utc>> {
1321 match raw.parse::<DateTime<Utc>>() {
1322 Ok(parsed) => Some(parsed),
1323 Err(err) => {
1324 failures.push(SchemaVersionFailure::IllegalIntermediateShape {
1325 invariant,
1326 detail: format!("{row_id} {column} is not RFC3339 UTC datetime: {err}"),
1327 });
1328 None
1329 }
1330 }
1331}
1332
1333fn v2_memory_session_use_failure(row_id: &str, detail: impl Into<String>) -> SchemaVersionFailure {
1334 SchemaVersionFailure::IllegalIntermediateShape {
1335 invariant: "schema_v2_default_persistence.memory_session_uses.valid",
1336 detail: format!("memory_session_uses row {row_id} {}", detail.into()),
1337 }
1338}
1339
1340fn v2_outcome_memory_relation_failure(
1341 row_id: &str,
1342 detail: impl Into<String>,
1343) -> SchemaVersionFailure {
1344 SchemaVersionFailure::IllegalIntermediateShape {
1345 invariant: "schema_v2_default_persistence.outcome_memory_relations.valid",
1346 detail: format!("outcome_memory_relations row {row_id} {}", detail.into()),
1347 }
1348}
1349
1350fn count_null_column_rows(pool: &Pool, table: &str, column: &str) -> StoreResult<u64> {
1351 let sql = format!("SELECT COUNT(*) FROM {table} WHERE {column} IS NULL;");
1352 pool.query_row(&sql, [], |row| row.get(0))
1353 .map_err(Into::into)
1354}
1355
1356fn row_exists(pool: &Pool, table: &'static str, id: &str) -> StoreResult<bool> {
1357 let sql = format!("SELECT 1 FROM {table} WHERE id = ?1 LIMIT 1;");
1358 let found = pool.query_row(&sql, params![id], |_| Ok(())).optional()?;
1359 Ok(found.is_some())
1360}
1361
1362fn has_table(pool: &Pool, table: &str) -> StoreResult<bool> {
1363 let existing = pool
1364 .query_row(
1365 "SELECT name FROM sqlite_master WHERE type = 'table' AND name = ?1;",
1366 params![table],
1367 |row| row.get::<_, String>(0),
1368 )
1369 .optional()?;
1370 Ok(existing.is_some())
1371}
1372
1373fn has_column(pool: &Pool, table: &str, column: &str) -> StoreResult<bool> {
1374 let sql = format!("PRAGMA table_info({table});");
1375 let mut stmt = pool.prepare(&sql)?;
1376 let columns = stmt.query_map([], |row| row.get::<_, String>(1))?;
1377 for found in columns {
1378 if found? == column {
1379 return Ok(true);
1380 }
1381 }
1382 Ok(false)
1383}
1384
1385#[cfg(test)]
1386mod tests {
1387 use rusqlite::Connection;
1388
1389 use super::*;
1390
1391 fn migrated_pool() -> Connection {
1392 let pool = Connection::open_in_memory().expect("open sqlite");
1393 crate::migrate::apply_pending(&pool).expect("apply migrations");
1394 pool
1395 }
1396
1397 #[test]
1398 fn verify_schema_version_passes_for_empty_v1_schema() {
1399 let pool = migrated_pool();
1400
1401 let report = verify_schema_version(&pool, cortex_core::SCHEMA_VERSION)
1402 .expect("verify schema version");
1403
1404 assert!(report.is_ok(), "unexpected failures: {report:?}");
1405 assert_eq!(
1406 report.checked_tables,
1407 REQUIRED_TABLES
1408 .iter()
1409 .map(|table| table.name)
1410 .collect::<Vec<_>>()
1411 );
1412 }
1413
1414 #[test]
1415 fn verify_schema_version_names_mismatched_event_row() {
1416 let pool = migrated_pool();
1417 pool.execute(
1418 "INSERT INTO events (
1419 id, schema_version, observed_at, recorded_at, source_json, event_type,
1420 trace_id, session_id, domain_tags_json, payload_json, payload_hash,
1421 prev_event_hash, event_hash
1422 ) VALUES (
1423 'evt_future', 2, '2026-01-01T00:00:00Z', '2026-01-01T00:00:00Z',
1424 '{\"kind\":\"test\"}', 'test.event', NULL, NULL, '[]', '{}',
1425 'payload-hash', NULL, 'event-hash'
1426 );",
1427 [],
1428 )
1429 .expect("insert mismatched event");
1430
1431 let report = verify_schema_version(&pool, 1).expect("verify schema version");
1432
1433 assert_eq!(
1434 report.failures,
1435 vec![SchemaVersionFailure::Mismatch {
1436 table: "events",
1437 row_id: "evt_future".into(),
1438 expected: 1,
1439 actual: 2,
1440 }]
1441 );
1442 assert_eq!(
1443 report.failures[0].invariant(),
1444 "schema_version.events.matches_code"
1445 );
1446 }
1447
1448 #[test]
1449 fn verify_schema_version_reports_missing_required_table() {
1450 let pool = migrated_pool();
1451 pool.execute("DROP TABLE audit_records;", [])
1452 .expect("drop audit_records");
1453
1454 let report = verify_schema_version(&pool, cortex_core::SCHEMA_VERSION)
1455 .expect("verify schema version");
1456
1457 assert!(
1458 report
1459 .failures
1460 .contains(&SchemaVersionFailure::MissingTable {
1461 table: "audit_records"
1462 }),
1463 "failures: {report:?}"
1464 );
1465 }
1466
1467 #[test]
1468 fn verify_schema_version_reports_missing_required_column() {
1469 let pool = migrated_pool();
1470 pool.execute("ALTER TABLE events DROP COLUMN payload_hash;", [])
1471 .expect("drop payload_hash");
1472
1473 let report = verify_schema_version(&pool, cortex_core::SCHEMA_VERSION)
1474 .expect("verify schema version");
1475
1476 assert!(
1477 report
1478 .failures
1479 .contains(&SchemaVersionFailure::MissingColumn {
1480 table: "events",
1481 column: "payload_hash",
1482 }),
1483 "failures: {report:?}"
1484 );
1485 }
1486
1487 #[test]
1488 fn verify_schema_version_reports_unknown_migration() {
1489 let pool = migrated_pool();
1490 pool.execute("INSERT INTO _migrations (name) VALUES ('999_future');", [])
1491 .expect("insert future migration row");
1492
1493 let report = verify_schema_version(&pool, cortex_core::SCHEMA_VERSION)
1494 .expect("verify schema version");
1495
1496 assert_eq!(
1497 report.failures,
1498 vec![SchemaVersionFailure::UnknownMigration {
1499 name: "999_future".into(),
1500 }]
1501 );
1502 assert_eq!(
1503 report.failures[0].invariant(),
1504 "schema_migration.known_to_code"
1505 );
1506 }
1507
1508 #[test]
1514 fn verify_schema_version_v1_binary_refuses_on_v2_migration_marker() {
1515 let pool = migrated_pool();
1516 pool.execute(
1522 "DELETE FROM _migrations WHERE name NOT IN ('001_init', '002_authority_timeline');",
1523 [],
1524 )
1525 .expect("trim post-v1 migration rows to simulate v1-binary baseline");
1526 pool.execute(
1527 "INSERT INTO _migrations (name) VALUES ('003_schema_v2_expand');",
1528 [],
1529 )
1530 .expect("insert v2 cutover migration row");
1531
1532 let simulated_v1_known: &[&str] = &["001_init", "002_authority_timeline"];
1538 let mut stmt = pool
1539 .prepare("SELECT name FROM _migrations ORDER BY name;")
1540 .expect("prepare migrations scan");
1541 let observed_names: Vec<String> = stmt
1542 .query_map([], |row| row.get::<_, String>(0))
1543 .expect("query migrations")
1544 .collect::<rusqlite::Result<Vec<_>>>()
1545 .expect("collect migration names");
1546 let unknown_to_v1: Vec<String> = observed_names
1547 .into_iter()
1548 .filter(|name| !simulated_v1_known.iter().any(|known| *known == name))
1549 .collect();
1550
1551 assert_eq!(
1552 unknown_to_v1,
1553 vec!["003_schema_v2_expand".to_string()],
1554 "v1 binary must see 003_schema_v2_expand as unknown"
1555 );
1556
1557 pool.execute(
1563 "INSERT INTO _migrations (name) VALUES ('003_schema_v2_expand_unknown_to_v1');",
1564 [],
1565 )
1566 .expect("insert simulated-v1-unknown migration row");
1567
1568 let report = verify_schema_version(&pool, 1).expect("verify schema version");
1569
1570 let unknown_failures: Vec<_> = report
1571 .failures
1572 .iter()
1573 .filter_map(|failure| match failure {
1574 SchemaVersionFailure::UnknownMigration { name } => Some(name.clone()),
1575 _ => None,
1576 })
1577 .collect();
1578 assert_eq!(
1579 unknown_failures,
1580 vec!["003_schema_v2_expand_unknown_to_v1".to_string()]
1581 );
1582 assert!(report.failures.iter().any(|failure| matches!(
1583 failure,
1584 SchemaVersionFailure::UnknownMigration { name } if name == "003_schema_v2_expand_unknown_to_v1"
1585 )));
1586 let invariant = report
1587 .failures
1588 .iter()
1589 .find_map(|failure| match failure {
1590 SchemaVersionFailure::UnknownMigration { .. } => Some(failure.invariant()),
1591 _ => None,
1592 })
1593 .expect("unknown-migration failure present");
1594 assert_eq!(invariant, "schema_migration.known_to_code");
1595 }
1596
1597 #[test]
1601 fn verify_schema_version_v2_binary_on_v2_store_emits_no_unknown_migration_failure() {
1602 let pool = migrated_pool();
1603
1604 let report = verify_schema_version(&pool, cortex_core::SCHEMA_VERSION)
1605 .expect("verify schema version");
1606
1607 assert!(
1608 !report
1609 .failures
1610 .iter()
1611 .any(|failure| matches!(failure, SchemaVersionFailure::UnknownMigration { .. })),
1612 "no unknown-migration failure expected: {report:?}"
1613 );
1614 assert!(report.is_ok(), "report must be clean: {report:?}");
1615 }
1616
1617 #[test]
1622 fn verify_schema_version_v2_binary_on_future_v3_store_fails_closed() {
1623 let pool = migrated_pool();
1624 pool.execute(
1625 "INSERT INTO _migrations (name) VALUES ('999_v3_future_unknown_to_v2');",
1626 [],
1627 )
1628 .expect("insert v3 future migration row");
1629
1630 let report = verify_schema_version(&pool, cortex_core::SCHEMA_VERSION)
1631 .expect("verify schema version");
1632
1633 let unknown = report
1634 .failures
1635 .iter()
1636 .find_map(|failure| match failure {
1637 SchemaVersionFailure::UnknownMigration { name } => Some(name.clone()),
1638 _ => None,
1639 })
1640 .expect("unknown-migration failure present");
1641 assert_eq!(unknown, "999_v3_future_unknown_to_v2");
1642 assert!(!report.is_ok(), "v3 marker must fail closed: {report:?}");
1643 }
1644
1645 fn insert_event_row(pool: &Connection, id: &str) {
1646 pool.execute(
1647 "INSERT INTO events (
1648 id, schema_version, observed_at, recorded_at, source_json, event_type,
1649 trace_id, session_id, domain_tags_json, payload_json, payload_hash,
1650 prev_event_hash, event_hash
1651 ) VALUES (
1652 ?1, 1, '2026-01-01T00:00:00Z', '2026-01-01T00:00:00Z',
1653 '{\"kind\":\"test\"}', 'test.event', NULL, NULL, '[]', '{}',
1654 'payload-hash', NULL, ?2
1655 );",
1656 params![id, format!("event-hash-{id}")],
1657 )
1658 .expect("insert event");
1659 }
1660
1661 #[test]
1662 fn verify_post_migrate_row_counts_passes_with_only_boundary_delta() {
1663 let pool = migrated_pool();
1664 insert_event_row(&pool, "evt_one");
1665 insert_event_row(&pool, "evt_boundary");
1667
1668 let pre = PreV2BackupRowCounts {
1669 events: 1,
1670 traces: 0,
1671 episodes: 0,
1672 memories: 0,
1673 };
1674
1675 let failures = verify_post_migrate_row_counts(&pool, &pre).expect("count compare");
1676
1677 assert!(
1678 failures.is_empty(),
1679 "clean +1 events delta must not fail: {failures:?}"
1680 );
1681 }
1682
1683 #[test]
1684 fn verify_post_migrate_row_counts_rejects_events_growing_beyond_boundary_delta() {
1685 let pool = migrated_pool();
1686 insert_event_row(&pool, "evt_one");
1687 insert_event_row(&pool, "evt_two_boundary");
1688 insert_event_row(&pool, "evt_three_extra");
1689
1690 let pre = PreV2BackupRowCounts {
1691 events: 1,
1692 traces: 0,
1693 episodes: 0,
1694 memories: 0,
1695 };
1696
1697 let failures = verify_post_migrate_row_counts(&pool, &pre).expect("count compare");
1698
1699 assert_eq!(failures.len(), 1, "unexpected failures: {failures:?}");
1700 assert_eq!(failures[0].table, "events");
1701 assert_eq!(failures[0].expected, 2);
1702 assert_eq!(failures[0].actual, 3);
1703 assert_eq!(
1704 failures[0].invariant(),
1705 "schema_v2_post_migrate.row_count.events.matches_pre_plus_boundary"
1706 );
1707 assert!(
1708 failures[0]
1709 .detail()
1710 .contains("table events has 3 rows after migrate"),
1711 "detail: {}",
1712 failures[0].detail()
1713 );
1714 }
1715
1716 #[test]
1717 fn verify_post_migrate_row_counts_rejects_events_count_shrinking() {
1718 let pool = migrated_pool();
1719 let pre = PreV2BackupRowCounts {
1721 events: 5,
1722 traces: 0,
1723 episodes: 0,
1724 memories: 0,
1725 };
1726
1727 let failures = verify_post_migrate_row_counts(&pool, &pre).expect("count compare");
1728
1729 assert_eq!(failures.len(), 1, "unexpected failures: {failures:?}");
1730 assert_eq!(failures[0].expected, 6);
1731 assert_eq!(failures[0].actual, 0);
1732 }
1733
1734 #[test]
1735 fn verify_post_migrate_row_counts_rejects_unchanged_tables_that_drifted() {
1736 let pool = migrated_pool();
1737 insert_event_row(&pool, "evt_one");
1738 insert_event_row(&pool, "evt_boundary");
1739 pool.execute(
1740 "INSERT INTO traces (id, schema_version, opened_at, closed_at, trace_type, status)
1741 VALUES ('trc_new', 1, '2026-01-01T00:00:00Z', NULL, 'fixture', 'open');",
1742 [],
1743 )
1744 .expect("insert trace");
1745
1746 let pre = PreV2BackupRowCounts {
1747 events: 1,
1748 traces: 0,
1749 episodes: 0,
1750 memories: 0,
1751 };
1752
1753 let failures = verify_post_migrate_row_counts(&pool, &pre).expect("count compare");
1754
1755 assert_eq!(failures.len(), 1, "unexpected failures: {failures:?}");
1756 assert_eq!(failures[0].table, "traces");
1757 assert_eq!(failures[0].expected, 0);
1758 assert_eq!(failures[0].actual, 1);
1759 assert_eq!(
1760 failures[0].invariant(),
1761 "schema_v2_post_migrate.row_count.traces.unchanged"
1762 );
1763 }
1764
1765 #[test]
1772 fn verify_post_migrate_row_counts_refuses_pre_events_at_u64_max_overflow() {
1773 let pool = migrated_pool();
1774
1775 let pre = PreV2BackupRowCounts {
1776 events: u64::MAX,
1777 traces: 0,
1778 episodes: 0,
1779 memories: 0,
1780 };
1781
1782 let err = verify_post_migrate_row_counts(&pool, &pre)
1783 .expect_err("overflow must refuse via typed error");
1784
1785 match err {
1786 crate::StoreError::RowCountCheckedAddOverflow { table, pre, delta } => {
1787 assert_eq!(table, "events");
1788 assert_eq!(pre, u64::MAX);
1789 assert_eq!(delta, SCHEMA_V1_TO_V2_EVENT_BOUNDARY_DELTA);
1790 }
1791 other => panic!("unexpected error variant: {other:?}"),
1792 }
1793
1794 let invariant_err = verify_post_migrate_row_counts(&pool, &pre)
1796 .expect_err("overflow must refuse via typed error");
1797 assert_eq!(
1798 invariant_err.invariant(),
1799 Some(crate::VERIFY_ROW_COUNTS_CHECKED_ADD_OVERFLOW_INVARIANT),
1800 );
1801 assert_eq!(
1802 invariant_err.invariant(),
1803 Some("verify.row_counts.checked_add_overflow"),
1804 );
1805 }
1806
1807 #[test]
1808 fn verify_schema_v2_expand_shape_passes_after_default_bundle() {
1809 let pool = migrated_pool();
1814
1815 let failures = verify_schema_v2_expand_shape(&pool).expect("verify v2 expand shape");
1816
1817 assert!(failures.is_empty(), "unexpected failures: {failures:?}");
1818 }
1819
1820 #[test]
1821 fn verify_schema_v2_expand_shape_reports_partial_expand_when_artifacts_removed() {
1822 let pool = migrated_pool();
1827 pool.execute("ALTER TABLE episodes DROP COLUMN summary_spans_json;", [])
1828 .expect("drop episodes.summary_spans_json to simulate partial expand");
1829 pool.execute("DROP TABLE memory_session_uses;", [])
1830 .expect("drop memory_session_uses to simulate partial expand");
1831
1832 let failures = verify_schema_v2_expand_shape(&pool).expect("verify v2 expand shape");
1833
1834 assert!(
1835 failures.iter().any(|failure| matches!(
1836 failure,
1837 SchemaVersionFailure::IllegalIntermediateShape {
1838 invariant: "schema_v2_expand_shape.complete",
1839 detail,
1840 } if detail == "partial schema v2 expand shape is missing column episodes.summary_spans_json"
1841 )),
1842 "failures: {failures:?}"
1843 );
1844 assert!(
1845 failures.iter().any(|failure| matches!(
1846 failure,
1847 SchemaVersionFailure::IllegalIntermediateShape {
1848 invariant: "schema_v2_expand_shape.complete",
1849 detail,
1850 } if detail == "partial schema v2 expand shape is missing table memory_session_uses"
1851 )),
1852 "failures: {failures:?}"
1853 );
1854 }
1855
1856 #[test]
1857 fn verify_schema_v2_expand_shape_reports_unset_required_backfill() {
1858 let pool = migrated_pool();
1859 crate::migrate_v2::apply_expand_backfill_skeleton(
1860 &pool,
1861 "2026-05-04T13:00:00Z".parse().unwrap(),
1862 )
1863 .expect("apply complete v2 expand shape");
1864 pool.execute(
1865 "INSERT INTO events (
1866 id, schema_version, observed_at, recorded_at, source_json, event_type,
1867 trace_id, session_id, domain_tags_json, payload_json, payload_hash,
1868 prev_event_hash, event_hash, source_attestation_json
1869 ) VALUES (
1870 'evt_s2_unset_backfill', 1, '2026-05-04T12:00:00Z',
1871 '2026-05-04T12:00:01Z', '{\"kind\":\"test\"}', 'test.event',
1872 NULL, NULL, '[]', '{}', 'payload-hash', NULL, 'event-hash', NULL
1873 );",
1874 [],
1875 )
1876 .expect("insert row with unset v2 backfill metadata");
1877
1878 let failures = verify_schema_v2_expand_shape(&pool).expect("verify v2 expand shape");
1879
1880 assert_eq!(
1881 failures,
1882 vec![SchemaVersionFailure::IllegalIntermediateShape {
1883 invariant: "schema_v2_expand_backfill.complete",
1884 detail:
1885 "schema v2 expand backfill left 1 events.source_attestation_json rows unset"
1886 .into(),
1887 }]
1888 );
1889 }
1890
1891 #[test]
1892 fn count_post_v2_rows_outside_boundary_is_zero_for_empty_store() {
1893 let pool = migrated_pool();
1894 let counts =
1895 count_post_v2_rows_outside_boundary(&pool).expect("count post-v2 rows on empty store");
1896 assert!(counts.is_empty());
1897 assert_eq!(counts.events_post_v2, 0);
1898 assert_eq!(counts.traces_post_v2, 0);
1899 assert_eq!(counts.total(), 0);
1900 }
1901
1902 #[test]
1903 fn count_post_v2_rows_outside_boundary_is_zero_for_pre_v2_rows() {
1904 let pool = migrated_pool();
1905 pool.execute(
1906 "INSERT INTO events (
1907 id, schema_version, observed_at, recorded_at, source_json, event_type,
1908 trace_id, session_id, domain_tags_json, payload_json, payload_hash,
1909 prev_event_hash, event_hash
1910 ) VALUES (
1911 'evt_pre_v2_one', 1, '2026-01-01T00:00:00Z', '2026-01-01T00:00:00Z',
1912 '{\"kind\":\"test\"}', 'test.event', NULL, NULL, '[]', '{}',
1913 'payload-hash', NULL, 'event-hash-one'
1914 );",
1915 [],
1916 )
1917 .expect("insert pre-v2 event row");
1918 pool.execute(
1919 "INSERT INTO traces (id, schema_version, opened_at, closed_at, trace_type, status)
1920 VALUES ('trc_pre_v2_one', 1, '2026-01-01T00:00:00Z', NULL, 'fixture', 'open');",
1921 [],
1922 )
1923 .expect("insert pre-v2 trace row");
1924
1925 let counts =
1926 count_post_v2_rows_outside_boundary(&pool).expect("count post-v2 rows on v1 fixture");
1927 assert!(counts.is_empty());
1928 }
1929
1930 #[test]
1931 fn count_post_v2_rows_outside_boundary_skips_the_schema_migration_boundary_row() {
1932 let pool = migrated_pool();
1933 pool.execute(
1934 "INSERT INTO events (
1935 id, schema_version, observed_at, recorded_at, source_json, event_type,
1936 trace_id, session_id, domain_tags_json, payload_json, payload_hash,
1937 prev_event_hash, event_hash
1938 ) VALUES (
1939 'evt_boundary', 2, '2026-01-01T00:00:00Z', '2026-01-01T00:00:00Z',
1940 '{\"kind\":\"runtime\"}', 'system.note', NULL, NULL, '[]',
1941 '{\"kind\":\"schema_migration.v1_to_v2\",\"previous_v1_head_hash\":\"prev\",\"migration_script_digest\":\"blake3:digest\",\"fixture_verification_result_hash\":\"blake3:fixture\"}',
1942 'payload-hash', NULL, 'event-hash-boundary'
1943 );",
1944 [],
1945 )
1946 .expect("insert boundary-shape row");
1947
1948 let counts = count_post_v2_rows_outside_boundary(&pool)
1949 .expect("count post-v2 rows after boundary insert");
1950 assert_eq!(counts.events_post_v2, 0);
1951 assert_eq!(counts.traces_post_v2, 0);
1952 }
1953
1954 #[test]
1955 fn count_post_v2_rows_outside_boundary_surfaces_fresh_v2_events_row() {
1956 let pool = migrated_pool();
1957 pool.execute(
1958 "INSERT INTO events (
1959 id, schema_version, observed_at, recorded_at, source_json, event_type,
1960 trace_id, session_id, domain_tags_json, payload_json, payload_hash,
1961 prev_event_hash, event_hash
1962 ) VALUES (
1963 'evt_fresh_v2', 2, '2026-01-01T00:00:00Z', '2026-01-01T00:00:00Z',
1964 '{\"kind\":\"runtime\"}', 'tool.result', NULL, NULL, '[]',
1965 '{\"step\":1}',
1966 'payload-hash', NULL, 'event-hash-fresh-v2'
1967 );",
1968 [],
1969 )
1970 .expect("insert fresh-v2 events row");
1971
1972 let counts = count_post_v2_rows_outside_boundary(&pool)
1973 .expect("count post-v2 rows on fresh-v2 fixture");
1974 assert!(!counts.is_empty());
1975 assert_eq!(counts.events_post_v2, 1);
1976 assert_eq!(counts.traces_post_v2, 0);
1977 assert_eq!(counts.total(), 1);
1978 }
1979
1980 #[test]
1981 fn count_post_v2_rows_outside_boundary_surfaces_fresh_v2_traces_row() {
1982 let pool = migrated_pool();
1983 pool.execute(
1984 "INSERT INTO traces (id, schema_version, opened_at, closed_at, trace_type, status)
1985 VALUES ('trc_fresh_v2', 2, '2026-01-01T00:00:00Z', NULL, 'fixture', 'open');",
1986 [],
1987 )
1988 .expect("insert fresh-v2 trace row");
1989
1990 let counts = count_post_v2_rows_outside_boundary(&pool)
1991 .expect("count post-v2 rows on fresh-v2 trace fixture");
1992 assert!(!counts.is_empty());
1993 assert_eq!(counts.events_post_v2, 0);
1994 assert_eq!(counts.traces_post_v2, 1);
1995 }
1996}