1use chrono::{DateTime, Utc};
10use cortex_ledger::{event_hash as ledger_event_hash, hash::canonical_payload_bytes, payload_hash};
11use rusqlite::{params, OptionalExtension};
12use serde_json::{json, Value};
13
14use crate::verify::{
15 verify_schema_v2_default_persistence_readiness, verify_schema_v2_expand_shape,
16 verify_schema_version, SchemaVersionFailure,
17};
18use crate::{Pool, StoreError, StoreResult};
19
20pub const SCHEMA_V2_EXPAND_SQL: &str = include_str!("../migrations/003_schema_v2_expand.sql");
22
23const FIXTURE_COUNT_TABLES: &[&str] = &[
24 "events",
25 "traces",
26 "episodes",
27 "memories",
28 "context_packs",
29 "audit_records",
30];
31
32const DRY_RUN_STEPS: &[&str] = &[
33 "preflight_schema_v1",
34 "inspect_v1_hash_chain_head",
35 "plan_expand_nullable_v2_columns",
36 "plan_legacy_unattested_backfill",
37 "plan_schema_migration_boundary_event",
38 "leave_schema_version_unchanged",
39];
40
41const STAGE_BACKUP_PREFLIGHT_READY: &str = "backup-preflight-ready";
42const STAGE_EXPAND_BACKFILL: &str = "expand/backfill";
43const STAGE_BOUNDARY_APPEND_PENDING: &str = "boundary-append-pending";
44const STAGE_POST_MIGRATION_AUDIT_PENDING: &str = "post-migration-audit-pending";
45
46const FIXTURE_VERIFICATION_TRANSCRIPT_SCHEMA_VERSION: u16 = 1;
47const FIXTURE_VERIFICATION_MIGRATION_ID: &str = "schema_v2_dry_run_fixture_verification";
48const DEFAULT_SCHEMA_V2_TARGET: u16 = 2;
49
50#[derive(Debug, Clone, Copy, PartialEq, Eq)]
52pub enum V2DurableS29ArtifactKind {
53 Column,
55 Table,
57}
58
59#[derive(Debug, Clone, Copy, PartialEq, Eq)]
61pub struct V2DurableS29Artifact {
62 pub kind: V2DurableS29ArtifactKind,
64 pub table: &'static str,
66 pub column: Option<&'static str>,
68}
69
70pub const DURABLE_S2_9_ARTIFACTS: &[V2DurableS29Artifact] = &[
72 V2DurableS29Artifact {
73 kind: V2DurableS29ArtifactKind::Column,
74 table: "events",
75 column: Some("source_attestation_json"),
76 },
77 V2DurableS29Artifact {
78 kind: V2DurableS29ArtifactKind::Column,
79 table: "episodes",
80 column: Some("summary_spans_json"),
81 },
82 V2DurableS29Artifact {
83 kind: V2DurableS29ArtifactKind::Column,
84 table: "memories",
85 column: Some("summary_spans_json"),
86 },
87 V2DurableS29Artifact {
88 kind: V2DurableS29ArtifactKind::Column,
89 table: "memories",
90 column: Some("cross_session_use_count"),
91 },
92 V2DurableS29Artifact {
93 kind: V2DurableS29ArtifactKind::Column,
94 table: "memories",
95 column: Some("first_used_at"),
96 },
97 V2DurableS29Artifact {
98 kind: V2DurableS29ArtifactKind::Column,
99 table: "memories",
100 column: Some("last_cross_session_use_at"),
101 },
102 V2DurableS29Artifact {
103 kind: V2DurableS29ArtifactKind::Column,
104 table: "memories",
105 column: Some("last_validation_at"),
106 },
107 V2DurableS29Artifact {
108 kind: V2DurableS29ArtifactKind::Column,
109 table: "memories",
110 column: Some("validation_epoch"),
111 },
112 V2DurableS29Artifact {
113 kind: V2DurableS29ArtifactKind::Column,
114 table: "memories",
115 column: Some("blessed_until"),
116 },
117 V2DurableS29Artifact {
118 kind: V2DurableS29ArtifactKind::Column,
119 table: "context_packs",
120 column: Some("consumer_advisory_json"),
121 },
122 V2DurableS29Artifact {
123 kind: V2DurableS29ArtifactKind::Table,
124 table: "memory_session_uses",
125 column: None,
126 },
127 V2DurableS29Artifact {
128 kind: V2DurableS29ArtifactKind::Table,
129 table: "outcome_memory_relations",
130 column: None,
131 },
132];
133
134#[derive(Debug, Clone, PartialEq, Eq)]
136pub struct FixtureTableCount {
137 pub table: &'static str,
139 pub rows: u64,
141}
142
143#[derive(Debug, Clone, PartialEq, Eq)]
145pub struct V1FixtureReport {
146 pub expected_schema_version: u16,
148 pub observed_row_schema_versions: Vec<u16>,
150 pub table_counts: Vec<FixtureTableCount>,
152 pub event_chain_head: Option<String>,
154 pub applied_migrations: Vec<String>,
156}
157
158#[derive(Debug, Clone, PartialEq, Eq)]
160pub struct V2DryRunPlan {
161 pub fixture: V1FixtureReport,
163 pub steps: Vec<&'static str>,
165 pub failures: Vec<SchemaVersionFailure>,
167}
168
169#[derive(Debug, Clone, Copy, PartialEq, Eq)]
171pub enum V2MigrationStageStatus {
172 Ready,
174 Pending,
176 Blocked,
178}
179
180#[derive(Debug, Clone, PartialEq, Eq)]
182pub struct V2MigrationStage {
183 pub name: &'static str,
185 pub status: V2MigrationStageStatus,
187 pub mutates_store: bool,
189 pub enables_cutover: bool,
191 pub reason: &'static str,
193}
194
195#[derive(Debug, Clone, PartialEq, Eq)]
197pub struct V2MigrationStagePlan {
198 pub dry_run: V2DryRunPlan,
200 pub stages: Vec<V2MigrationStage>,
202}
203
204#[derive(Debug, Clone, PartialEq, Eq)]
206pub struct V2FixtureVerificationFailure {
207 pub invariant: String,
209 pub detail: String,
211}
212
213#[derive(Debug, Clone, PartialEq, Eq)]
215pub struct V2FixtureVerificationTranscript {
216 pub transcript_schema_version: u16,
218 pub migration_id: &'static str,
220 pub boundary_previous_v1_head_hash: String,
222 pub fixture: V1FixtureReport,
224 pub steps: Vec<&'static str>,
226 pub failures: Vec<V2FixtureVerificationFailure>,
228}
229
230#[derive(Debug, Clone, PartialEq, Eq)]
232pub struct V2ExpandBackfillReport {
233 pub added_columns: Vec<String>,
235 pub created_tables: Vec<&'static str>,
237 pub legacy_event_attestations_backfilled: u64,
239 pub episode_summary_spans_backfilled: u64,
241 pub memory_summary_spans_backfilled: u64,
243 pub context_pack_advisories_backfilled: u64,
245 pub memory_salience_defaults_backfilled: u64,
247}
248
249#[derive(Debug, Clone, PartialEq, Eq)]
251pub struct V2DefaultWriteShapeReadinessReport {
252 pub expand_backfill: V2ExpandBackfillReport,
254 pub event_rows_marked_v2: u64,
256 pub trace_rows_marked_v2: u64,
258 pub readiness_failures: Vec<SchemaVersionFailure>,
260 pub event_framing_failures: Vec<SchemaVersionFailure>,
262}
263
264impl V2DryRunPlan {
265 #[must_use]
267 pub fn is_ready(&self) -> bool {
268 self.failures.is_empty()
269 }
270
271 #[must_use]
273 pub fn fixture_verification_transcript(
274 &self,
275 boundary_previous_v1_head_hash: impl Into<String>,
276 ) -> V2FixtureVerificationTranscript {
277 fixture_verification_transcript(self, boundary_previous_v1_head_hash)
278 }
279
280 #[must_use]
282 pub fn fixture_verification_result_hash(
283 &self,
284 boundary_previous_v1_head_hash: impl Into<String>,
285 ) -> String {
286 self.fixture_verification_transcript(boundary_previous_v1_head_hash)
287 .digest()
288 }
289}
290
291impl V2MigrationStagePlan {
292 #[must_use]
294 pub fn cutover_enabled(&self) -> bool {
295 self.stages.iter().any(|stage| stage.enables_cutover)
296 }
297
298 #[must_use]
300 pub fn stage_names(&self) -> Vec<&'static str> {
301 self.stages.iter().map(|stage| stage.name).collect()
302 }
303}
304
305impl V2DefaultWriteShapeReadinessReport {
306 #[must_use]
308 pub fn is_ready(&self) -> bool {
309 self.readiness_failures.is_empty()
310 }
311
312 #[must_use]
314 pub fn is_cutover_ready(&self) -> bool {
315 self.is_ready() && self.event_framing_failures.is_empty()
316 }
317}
318
319impl V2FixtureVerificationTranscript {
320 #[must_use]
322 pub fn to_json_value(&self) -> Value {
323 let mut table_counts = self.fixture.table_counts.clone();
324 table_counts.sort_by(|left, right| left.table.cmp(right.table));
325
326 let mut applied_migrations = self.fixture.applied_migrations.clone();
327 applied_migrations.sort();
328
329 json!({
330 "boundary_previous_v1_head_hash": &self.boundary_previous_v1_head_hash,
331 "failures": self.failures.iter().map(|failure| {
332 json!({
333 "detail": &failure.detail,
334 "invariant": &failure.invariant,
335 })
336 }).collect::<Vec<_>>(),
337 "fixture": {
338 "applied_migrations": applied_migrations,
339 "event_chain_head": &self.fixture.event_chain_head,
340 "expected_schema_version": self.fixture.expected_schema_version,
341 "observed_row_schema_versions": &self.fixture.observed_row_schema_versions,
342 "table_counts": table_counts.iter().map(|count| {
343 json!({
344 "rows": count.rows,
345 "table": count.table,
346 })
347 }).collect::<Vec<_>>(),
348 },
349 "migration_id": self.migration_id,
350 "steps": &self.steps,
351 "transcript_schema_version": self.transcript_schema_version,
352 })
353 }
354
355 #[must_use]
357 pub fn canonical_json_bytes(&self) -> Vec<u8> {
358 canonical_payload_bytes(&self.to_json_value())
359 }
360
361 #[must_use]
363 pub fn digest(&self) -> String {
364 format!("blake3:{}", payload_hash(&self.to_json_value()))
365 }
366}
367
368pub fn inspect_v1_fixture(pool: &Pool) -> StoreResult<V1FixtureReport> {
370 Ok(V1FixtureReport {
371 expected_schema_version: cortex_core::SCHEMA_VERSION,
372 observed_row_schema_versions: observed_schema_versions(pool)?,
373 table_counts: table_counts(pool)?,
374 event_chain_head: event_chain_head(pool)?,
375 applied_migrations: applied_migrations(pool)?,
376 })
377}
378
379pub fn dry_run_plan(pool: &Pool) -> StoreResult<V2DryRunPlan> {
390 let fixture = inspect_v1_fixture(pool)?;
391 let mut failures = verify_schema_version(pool, cortex_core::SCHEMA_VERSION)?.failures;
392 failures.extend(dry_run_shape_failures(pool)?);
393
394 Ok(V2DryRunPlan {
395 fixture,
396 steps: DRY_RUN_STEPS.to_vec(),
397 failures,
398 })
399}
400
401fn dry_run_shape_failures(pool: &Pool) -> StoreResult<Vec<SchemaVersionFailure>> {
406 let all = verify_schema_v2_expand_shape(pool)?;
407 Ok(all
408 .into_iter()
409 .filter(|failure| match failure {
410 SchemaVersionFailure::IllegalIntermediateShape { invariant, .. } => {
411 *invariant != "schema_v2_expand_backfill.complete"
413 }
414 _ => true,
415 })
416 .collect())
417}
418
419pub fn staged_execution_plan(pool: &Pool) -> StoreResult<V2MigrationStagePlan> {
425 let dry_run = dry_run_plan(pool)?;
426 let store_ready_status = if dry_run.is_ready() {
427 V2MigrationStageStatus::Ready
428 } else {
429 V2MigrationStageStatus::Blocked
430 };
431
432 Ok(V2MigrationStagePlan {
433 dry_run,
434 stages: vec![
435 V2MigrationStage {
436 name: STAGE_BACKUP_PREFLIGHT_READY,
437 status: store_ready_status,
438 mutates_store: false,
439 enables_cutover: false,
440 reason: "schema v1 preflight passed; backup manifest remains an external CLI/operator gate",
441 },
442 V2MigrationStage {
443 name: STAGE_EXPAND_BACKFILL,
444 status: store_ready_status,
445 mutates_store: true,
446 enables_cutover: false,
447 reason: "store may apply nullable v2 expand/backfill skeleton while row schema versions stay v1",
448 },
449 V2MigrationStage {
450 name: STAGE_BOUNDARY_APPEND_PENDING,
451 status: V2MigrationStageStatus::Pending,
452 mutates_store: false,
453 enables_cutover: false,
454 reason: "boundary append is ledger/CLI cutover work and is not executable from cortex-store",
455 },
456 V2MigrationStage {
457 name: STAGE_POST_MIGRATION_AUDIT_PENDING,
458 status: V2MigrationStageStatus::Pending,
459 mutates_store: false,
460 enables_cutover: false,
461 reason: "post-migration audit is pending until full migrate and boundary append exist",
462 },
463 ],
464 })
465}
466
467#[must_use]
469pub fn fixture_verification_transcript(
470 plan: &V2DryRunPlan,
471 boundary_previous_v1_head_hash: impl Into<String>,
472) -> V2FixtureVerificationTranscript {
473 let mut failures = plan
474 .failures
475 .iter()
476 .map(|failure| V2FixtureVerificationFailure {
477 invariant: failure.invariant(),
478 detail: failure.detail(),
479 })
480 .collect::<Vec<_>>();
481 failures.sort_by(|left, right| {
482 left.invariant
483 .cmp(&right.invariant)
484 .then_with(|| left.detail.cmp(&right.detail))
485 });
486
487 V2FixtureVerificationTranscript {
488 transcript_schema_version: FIXTURE_VERIFICATION_TRANSCRIPT_SCHEMA_VERSION,
489 migration_id: FIXTURE_VERIFICATION_MIGRATION_ID,
490 boundary_previous_v1_head_hash: boundary_previous_v1_head_hash.into(),
491 fixture: plan.fixture.clone(),
492 steps: plan.steps.clone(),
493 failures,
494 }
495}
496
497#[must_use]
499pub fn fixture_verification_result_hash(
500 plan: &V2DryRunPlan,
501 boundary_previous_v1_head_hash: impl Into<String>,
502) -> String {
503 fixture_verification_transcript(plan, boundary_previous_v1_head_hash).digest()
504}
505
506#[must_use]
508pub fn durable_s2_9_artifacts() -> &'static [V2DurableS29Artifact] {
509 DURABLE_S2_9_ARTIFACTS
510}
511
512pub fn default_v2_persistence_readiness_failures(
514 pool: &Pool,
515) -> StoreResult<Vec<SchemaVersionFailure>> {
516 verify_schema_v2_default_persistence_readiness(pool)
517}
518
519pub fn default_v2_event_framing_readiness_failures(
524 pool: &Pool,
525) -> StoreResult<Vec<SchemaVersionFailure>> {
526 let mut failures = Vec::new();
527 let mut stmt = pool.prepare(
528 "SELECT id, schema_version, payload_json, payload_hash, prev_event_hash, event_hash
529 FROM events
530 ORDER BY id;",
531 )?;
532 let rows = stmt.query_map([], |row| {
533 Ok((
534 row.get::<_, String>(0)?,
535 row.get::<_, u16>(1)?,
536 row.get::<_, String>(2)?,
537 row.get::<_, String>(3)?,
538 row.get::<_, Option<String>>(4)?,
539 row.get::<_, String>(5)?,
540 ))
541 })?;
542
543 for row in rows {
544 let (
545 id,
546 schema_version,
547 payload_json,
548 stored_payload_hash,
549 prev_event_hash,
550 stored_event_hash,
551 ) = row?;
552 if schema_version != DEFAULT_SCHEMA_V2_TARGET {
553 continue;
554 }
555
556 let payload = match serde_json::from_str::<Value>(&payload_json) {
557 Ok(payload) => payload,
558 Err(err) => {
559 failures.push(v2_event_framing_failure(
560 id,
561 format!("payload_json is not valid JSON: {err}"),
562 ));
563 continue;
564 }
565 };
566 let expected_payload_hash = payload_hash(&payload);
567 if stored_payload_hash != expected_payload_hash {
568 failures.push(v2_event_framing_failure(
569 &id,
570 format!(
571 "payload_hash mismatch under future v2 framing: expected {expected_payload_hash}, found {stored_payload_hash}"
572 ),
573 ));
574 }
575
576 let expected_event_hash =
577 ledger_event_hash(prev_event_hash.as_deref(), &stored_payload_hash);
578 if stored_event_hash != expected_event_hash {
579 failures.push(v2_event_framing_failure(
580 id,
581 format!(
582 "event_hash mismatch under future v2 framing: expected {expected_event_hash}, found {stored_event_hash}"
583 ),
584 ));
585 }
586 }
587
588 Ok(failures)
589}
590
591pub fn default_v2_cutover_readiness_failures(
593 pool: &Pool,
594) -> StoreResult<Vec<SchemaVersionFailure>> {
595 let mut failures = default_v2_persistence_readiness_failures(pool)?;
596 failures.extend(default_v2_event_framing_readiness_failures(pool)?);
597 Ok(failures)
598}
599
600pub fn require_default_v2_persistence_readiness(pool: &Pool) -> StoreResult<()> {
602 let failures = default_v2_persistence_readiness_failures(pool)?;
603 if failures.is_empty() {
604 Ok(())
605 } else {
606 Err(StoreError::Validation(format!(
607 "schema v2 default persistence readiness failed: {failures:?}"
608 )))
609 }
610}
611
612pub fn require_default_v2_cutover_readiness(pool: &Pool) -> StoreResult<()> {
614 let failures = default_v2_cutover_readiness_failures(pool)?;
615 if failures.is_empty() {
616 Ok(())
617 } else {
618 Err(StoreError::Validation(format!(
619 "schema v2 default cutover readiness failed: {failures:?}"
620 )))
621 }
622}
623
624pub fn apply_expand_backfill_skeleton(
630 pool: &Pool,
631 imported_at: DateTime<Utc>,
632) -> StoreResult<V2ExpandBackfillReport> {
633 let schema_version_report = verify_schema_version(pool, cortex_core::SCHEMA_VERSION)?;
641 if !schema_version_report.is_ok() {
642 return Err(StoreError::Validation(format!(
643 "schema v2 expand/backfill preflight refused future-schema rows: {:?}",
644 schema_version_report.failures
645 )));
646 }
647
648 let mut report = V2ExpandBackfillReport {
649 added_columns: Vec::new(),
650 created_tables: Vec::new(),
651 legacy_event_attestations_backfilled: 0,
652 episode_summary_spans_backfilled: 0,
653 memory_summary_spans_backfilled: 0,
654 context_pack_advisories_backfilled: 0,
655 memory_salience_defaults_backfilled: 0,
656 };
657
658 add_column_if_missing(
659 pool,
660 &mut report,
661 "events",
662 "source_attestation_json",
663 "ALTER TABLE events ADD COLUMN source_attestation_json TEXT NULL \
664 CHECK (source_attestation_json IS NULL OR json_valid(source_attestation_json));",
665 )?;
666 add_column_if_missing(
667 pool,
668 &mut report,
669 "episodes",
670 "summary_spans_json",
671 "ALTER TABLE episodes ADD COLUMN summary_spans_json TEXT NULL \
672 CHECK (summary_spans_json IS NULL OR json_valid(summary_spans_json));",
673 )?;
674 add_column_if_missing(
675 pool,
676 &mut report,
677 "memories",
678 "summary_spans_json",
679 "ALTER TABLE memories ADD COLUMN summary_spans_json TEXT NULL \
680 CHECK (summary_spans_json IS NULL OR json_valid(summary_spans_json));",
681 )?;
682 add_column_if_missing(
683 pool,
684 &mut report,
685 "memories",
686 "cross_session_use_count",
687 "ALTER TABLE memories ADD COLUMN cross_session_use_count INTEGER NULL \
688 CHECK (cross_session_use_count IS NULL OR cross_session_use_count >= 0);",
689 )?;
690 add_column_if_missing(
691 pool,
692 &mut report,
693 "memories",
694 "first_used_at",
695 "ALTER TABLE memories ADD COLUMN first_used_at TEXT NULL;",
696 )?;
697 add_column_if_missing(
698 pool,
699 &mut report,
700 "memories",
701 "last_cross_session_use_at",
702 "ALTER TABLE memories ADD COLUMN last_cross_session_use_at TEXT NULL;",
703 )?;
704 add_column_if_missing(
705 pool,
706 &mut report,
707 "memories",
708 "last_validation_at",
709 "ALTER TABLE memories ADD COLUMN last_validation_at TEXT NULL;",
710 )?;
711 add_column_if_missing(
712 pool,
713 &mut report,
714 "memories",
715 "validation_epoch",
716 "ALTER TABLE memories ADD COLUMN validation_epoch INTEGER NULL \
717 CHECK (validation_epoch IS NULL OR validation_epoch >= 0);",
718 )?;
719 add_column_if_missing(
720 pool,
721 &mut report,
722 "memories",
723 "blessed_until",
724 "ALTER TABLE memories ADD COLUMN blessed_until TEXT NULL;",
725 )?;
726 add_column_if_missing(
727 pool,
728 &mut report,
729 "context_packs",
730 "consumer_advisory_json",
731 "ALTER TABLE context_packs ADD COLUMN consumer_advisory_json TEXT NULL \
732 CHECK (consumer_advisory_json IS NULL OR json_valid(consumer_advisory_json));",
733 )?;
734
735 create_table_if_missing(
736 pool,
737 &mut report,
738 "memory_session_uses",
739 "CREATE TABLE memory_session_uses (
740 memory_id TEXT NOT NULL REFERENCES memories(id),
741 session_id TEXT NOT NULL,
742 first_used_at TEXT NOT NULL,
743 last_used_at TEXT NOT NULL,
744 use_count INTEGER NOT NULL CHECK (use_count >= 0),
745 PRIMARY KEY (memory_id, session_id)
746 );",
747 )?;
748 create_table_if_missing(
749 pool,
750 &mut report,
751 "outcome_memory_relations",
752 "CREATE TABLE outcome_memory_relations (
753 outcome_ref TEXT NOT NULL,
754 memory_id TEXT NOT NULL REFERENCES memories(id),
755 relation TEXT NOT NULL,
756 recorded_at TEXT NOT NULL,
757 source_event_id TEXT NULL REFERENCES events(id),
758 PRIMARY KEY (outcome_ref, memory_id, relation)
759 );",
760 )?;
761
762 report.legacy_event_attestations_backfilled =
763 backfill_legacy_event_attestations(pool, imported_at)?;
764 report.episode_summary_spans_backfilled = backfill_episode_summary_spans(pool)?;
765 report.memory_summary_spans_backfilled = backfill_memory_summary_spans(pool)?;
766 report.context_pack_advisories_backfilled = backfill_context_pack_advisories(pool)?;
767 report.memory_salience_defaults_backfilled = backfill_memory_salience_defaults(pool)?;
768
769 Ok(report)
770}
771
772pub fn prepare_default_v2_write_shape_for_readiness(
780 pool: &Pool,
781 imported_at: DateTime<Utc>,
782) -> StoreResult<V2DefaultWriteShapeReadinessReport> {
783 let expand_backfill = apply_expand_backfill_skeleton(pool, imported_at)?;
784 let event_rows_marked_v2 = mark_schema_version(pool, "events", DEFAULT_SCHEMA_V2_TARGET)?;
785 let trace_rows_marked_v2 = mark_schema_version(pool, "traces", DEFAULT_SCHEMA_V2_TARGET)?;
786 let readiness_failures = default_v2_persistence_readiness_failures(pool)?;
787 let event_framing_failures = default_v2_event_framing_readiness_failures(pool)?;
788
789 Ok(V2DefaultWriteShapeReadinessReport {
790 expand_backfill,
791 event_rows_marked_v2,
792 trace_rows_marked_v2,
793 readiness_failures,
794 event_framing_failures,
795 })
796}
797
798fn v2_event_framing_failure(
799 row_id: impl Into<String>,
800 detail: impl Into<String>,
801) -> SchemaVersionFailure {
802 SchemaVersionFailure::IllegalIntermediateShape {
803 invariant: "schema_v2_default_persistence.event_framing.valid",
804 detail: format!("events row {} {}", row_id.into(), detail.into()),
805 }
806}
807
808fn observed_schema_versions(pool: &Pool) -> StoreResult<Vec<u16>> {
809 let mut versions = Vec::new();
810 for table in ["events", "traces"] {
811 let sql = format!("SELECT DISTINCT schema_version FROM {table} ORDER BY schema_version;");
812 let mut stmt = pool.prepare(&sql)?;
813 let rows = stmt.query_map([], |row| row.get::<_, u16>(0))?;
814 for row in rows {
815 let version = row?;
816 if !versions.contains(&version) {
817 versions.push(version);
818 }
819 }
820 }
821 versions.sort_unstable();
822 Ok(versions)
823}
824
825fn table_counts(pool: &Pool) -> StoreResult<Vec<FixtureTableCount>> {
826 FIXTURE_COUNT_TABLES
827 .iter()
828 .map(|table| {
829 let sql = format!("SELECT COUNT(*) FROM {table};");
830 let rows = pool.query_row(&sql, [], |row| row.get::<_, u64>(0))?;
831 Ok(FixtureTableCount { table, rows })
832 })
833 .collect()
834}
835
836fn event_chain_head(pool: &Pool) -> StoreResult<Option<String>> {
837 let head = pool
838 .query_row(
839 "SELECT e.event_hash
840 FROM events e
841 WHERE NOT EXISTS (
842 SELECT 1 FROM events child WHERE child.prev_event_hash = e.event_hash
843 )
844 ORDER BY e.recorded_at DESC, e.id DESC
845 LIMIT 1;",
846 [],
847 |row| row.get::<_, String>(0),
848 )
849 .optional()?;
850 Ok(head)
851}
852
853fn applied_migrations(pool: &Pool) -> StoreResult<Vec<String>> {
854 let mut stmt = pool.prepare("SELECT name FROM _migrations ORDER BY name;")?;
855 let rows = stmt.query_map(params![], |row| row.get::<_, String>(0))?;
856 rows.collect::<Result<_, _>>().map_err(Into::into)
857}
858
859fn add_column_if_missing(
860 pool: &Pool,
861 report: &mut V2ExpandBackfillReport,
862 table: &'static str,
863 column: &'static str,
864 ddl: &str,
865) -> StoreResult<()> {
866 if has_column(pool, table, column)? {
867 return Ok(());
868 }
869
870 pool.execute_batch(ddl)?;
871 report.added_columns.push(format!("{table}.{column}"));
872 Ok(())
873}
874
875fn create_table_if_missing(
876 pool: &Pool,
877 report: &mut V2ExpandBackfillReport,
878 table: &'static str,
879 ddl: &str,
880) -> StoreResult<()> {
881 if has_table(pool, table)? {
882 return Ok(());
883 }
884
885 pool.execute_batch(ddl)?;
886 report.created_tables.push(table);
887 Ok(())
888}
889
890fn mark_schema_version(pool: &Pool, table: &'static str, target: u16) -> StoreResult<u64> {
898 const LEGACY_V1_ROW_SCHEMA_VERSION: u16 = 1;
899 let sql = format!(
900 "UPDATE {table}
901 SET schema_version = ?1
902 WHERE schema_version = ?2;"
903 );
904 let changed = pool.execute(&sql, params![target, LEGACY_V1_ROW_SCHEMA_VERSION])?;
905 Ok(changed as u64)
906}
907
908fn has_table(pool: &Pool, table: &str) -> StoreResult<bool> {
909 let existing = pool
910 .query_row(
911 "SELECT name FROM sqlite_master WHERE type = 'table' AND name = ?1;",
912 params![table],
913 |row| row.get::<_, String>(0),
914 )
915 .optional()?;
916 Ok(existing.is_some())
917}
918
919fn has_column(pool: &Pool, table: &str, column: &str) -> StoreResult<bool> {
920 let sql = format!("PRAGMA table_info({table});");
921 let mut stmt = pool.prepare(&sql)?;
922 let columns = stmt.query_map([], |row| row.get::<_, String>(1))?;
923 for found in columns {
924 if found? == column {
925 return Ok(true);
926 }
927 }
928 Ok(false)
929}
930
931pub fn backfill_legacy_event_attestations(
972 pool: &Pool,
973 imported_at: DateTime<Utc>,
974) -> StoreResult<u64> {
975 let changed = pool.execute(
981 "UPDATE events
982 SET source_attestation_json = json_object(
983 'state', 'legacy_unattested',
984 'value', json_object(
985 'imported_at', ?1,
986 'original_recorded_at', recorded_at
987 )
988 )
989 WHERE source_attestation_json IS NULL;",
990 params![imported_at.to_rfc3339()],
991 )?;
992 Ok(changed as u64)
993}
994
995fn backfill_episode_summary_spans(pool: &Pool) -> StoreResult<u64> {
996 let mut stmt = pool.prepare(
997 "SELECT id, summary, source_events_json
998 FROM episodes
999 WHERE summary_spans_json IS NULL
1000 ORDER BY id;",
1001 )?;
1002 let rows = stmt.query_map([], |row| {
1003 Ok((
1004 row.get::<_, String>(0)?,
1005 row.get::<_, String>(1)?,
1006 row.get::<_, String>(2)?,
1007 ))
1008 })?;
1009
1010 let mut changed = 0;
1011 for row in rows {
1012 let (id, summary, source_events_json) = row?;
1013 let source_events: Value = serde_json::from_str(&source_events_json)?;
1014 let spans = legacy_summary_spans(&summary, source_events);
1015 pool.execute(
1016 "UPDATE episodes SET summary_spans_json = ?1 WHERE id = ?2;",
1017 params![serde_json::to_string(&spans)?, id],
1018 )?;
1019 changed += 1;
1020 }
1021 Ok(changed)
1022}
1023
1024fn backfill_memory_summary_spans(pool: &Pool) -> StoreResult<u64> {
1025 let mut stmt = pool.prepare(
1026 "SELECT id, memory_type, claim, source_events_json
1027 FROM memories
1028 WHERE summary_spans_json IS NULL
1029 ORDER BY id;",
1030 )?;
1031 let rows = stmt.query_map([], |row| {
1032 Ok((
1033 row.get::<_, String>(0)?,
1034 row.get::<_, String>(1)?,
1035 row.get::<_, String>(2)?,
1036 row.get::<_, String>(3)?,
1037 ))
1038 })?;
1039
1040 let mut changed = 0;
1041 for row in rows {
1042 let (id, memory_type, claim, source_events_json) = row?;
1043 let source_events: Value = serde_json::from_str(&source_events_json)?;
1044 let spans = if memory_type.contains("summary") {
1045 legacy_summary_spans(&claim, source_events)
1046 } else {
1047 json!([])
1048 };
1049 pool.execute(
1050 "UPDATE memories SET summary_spans_json = ?1 WHERE id = ?2;",
1051 params![serde_json::to_string(&spans)?, id],
1052 )?;
1053 changed += 1;
1054 }
1055 Ok(changed)
1056}
1057
1058fn backfill_context_pack_advisories(pool: &Pool) -> StoreResult<u64> {
1059 let advisory = json!({
1060 "render_trust": "untrusted_rendering",
1061 "execution_trust": "untrusted_execution",
1062 "flags": ["contains_unattested_sources"],
1063 "advisory_text": "Legacy v1 context pack: render as untrusted text; do not execute pack-derived strings."
1064 });
1065 let changed = pool.execute(
1066 "UPDATE context_packs
1067 SET consumer_advisory_json = ?1
1068 WHERE consumer_advisory_json IS NULL;",
1069 params![serde_json::to_string(&advisory)?],
1070 )?;
1071 Ok(changed as u64)
1072}
1073
1074fn backfill_memory_salience_defaults(pool: &Pool) -> StoreResult<u64> {
1075 let changed = pool.execute(
1076 "UPDATE memories
1077 SET cross_session_use_count = COALESCE(cross_session_use_count, 0),
1078 validation_epoch = COALESCE(validation_epoch, 0)
1079 WHERE cross_session_use_count IS NULL
1080 OR validation_epoch IS NULL;",
1081 [],
1082 )?;
1083 Ok(changed as u64)
1084}
1085
1086fn legacy_summary_spans(text: &str, source_events: Value) -> Value {
1087 if text.trim().is_empty() {
1088 json!([])
1089 } else {
1090 json!([{
1091 "byte_start": 0,
1092 "byte_end": text.len(),
1093 "derived_from_event_ids": source_events,
1094 "max_source_authority": "derived"
1095 }])
1096 }
1097}
1098
1099#[cfg(test)]
1100mod tests {
1101 use rusqlite::Connection;
1102
1103 use super::*;
1104
1105 fn fixture_pool() -> Connection {
1106 let pool = Connection::open_in_memory().expect("open sqlite");
1107 crate::migrate::apply_pending(&pool).expect("apply migrations");
1108 insert_small_v1_fixture(&pool);
1109 pool
1110 }
1111
1112 fn insert_small_v1_fixture(pool: &Pool) {
1113 pool.execute_batch(
1114 r#"
1115 INSERT INTO events (
1116 id, schema_version, observed_at, recorded_at, source_json, event_type,
1117 trace_id, session_id, domain_tags_json, payload_json, payload_hash,
1118 prev_event_hash, event_hash
1119 ) VALUES
1120 ('evt_s2_001', 1, '2026-05-04T12:00:00Z', '2026-05-04T12:00:01Z',
1121 '{"kind":"tool","name":"fixture"}', 'tool.result', NULL, 's2-fixture',
1122 '["s2"]', '{"step":1}', 'payload-1', NULL, 'event-hash-1'),
1123 ('evt_s2_002', 1, '2026-05-04T12:00:02Z', '2026-05-04T12:00:03Z',
1124 '{"kind":"tool","name":"fixture"}', 'tool.result', NULL, 's2-fixture',
1125 '["s2"]', '{"step":2}', 'payload-2', 'event-hash-1', 'event-hash-2');
1126
1127 INSERT INTO traces (
1128 id, schema_version, opened_at, closed_at, trace_type, status
1129 ) VALUES (
1130 'trc_s2_001', 1, '2026-05-04T12:00:00Z', NULL, 'migration_fixture', 'open'
1131 );
1132
1133 INSERT INTO trace_events (trace_id, event_id, ordinal) VALUES
1134 ('trc_s2_001', 'evt_s2_001', 0),
1135 ('trc_s2_001', 'evt_s2_002', 1);
1136
1137 INSERT INTO episodes (
1138 id, trace_id, source_events_json, summary, domains_json, entities_json,
1139 candidate_meaning, extracted_by_json, confidence, status
1140 ) VALUES (
1141 'epi_s2_001', 'trc_s2_001', '["evt_s2_001","evt_s2_002"]',
1142 'Small v1 fixture episode.', '["s2"]', '["cortex"]', NULL,
1143 '{"kind":"fixture"}', 0.8, 'candidate'
1144 );
1145
1146 INSERT INTO memories (
1147 id, memory_type, status, claim, source_episodes_json, source_events_json,
1148 domains_json, salience_json, confidence, authority, applies_when_json,
1149 does_not_apply_when_json, created_at, updated_at
1150 ) VALUES (
1151 'mem_s2_001', 'semantic', 'candidate', 'S2 fixture exists.',
1152 '["epi_s2_001"]', '["evt_s2_001","evt_s2_002"]', '["s2"]',
1153 '{"score":0.4}', 0.7, 'candidate', '{}', '{}',
1154 '2026-05-04T12:00:04Z', '2026-05-04T12:00:04Z'
1155 );
1156
1157 INSERT INTO context_packs (
1158 id, task, pack_json, selection_audit, created_at
1159 ) VALUES (
1160 'ctx_s2_001', 'schema v2 fixture', '{"refs":[]}', 'fixture',
1161 '2026-05-04T12:00:05Z'
1162 );
1163
1164 INSERT INTO audit_records (
1165 id, operation, target_ref, before_hash, after_hash, reason, actor_json,
1166 source_refs_json, created_at
1167 ) VALUES (
1168 'aud_s2_001', 'fixture.create', 'mem_s2_001', NULL, 'after',
1169 'small v1 fixture', '{"kind":"test"}', '["evt_s2_001"]',
1170 '2026-05-04T12:00:06Z'
1171 );
1172 "#,
1173 )
1174 .expect("insert small v1 fixture");
1175 }
1176
1177 #[test]
1178 fn dry_run_plan_inspects_small_v1_fixture_without_mutation() {
1179 let pool = fixture_pool();
1187 let before = inspect_v1_fixture(&pool).expect("inspect before dry run");
1188
1189 let plan = dry_run_plan(&pool).expect("dry run plan");
1190 let after = inspect_v1_fixture(&pool).expect("inspect after dry run");
1191
1192 assert_eq!(before, after, "dry run must not mutate the v1 fixture");
1193 assert_eq!(
1194 plan.fixture.event_chain_head.as_deref(),
1195 Some("event-hash-2")
1196 );
1197 assert_eq!(plan.fixture.observed_row_schema_versions, vec![1]);
1198 assert!(plan.steps.contains(&"plan_schema_migration_boundary_event"));
1199 assert_eq!(
1200 plan.fixture.applied_migrations,
1201 vec![
1202 "001_init",
1203 "002_authority_timeline",
1204 "003_schema_v2_expand",
1205 "004_principle_promotion_policy_record",
1206 "005_outcome_relation_scope",
1207 "006_fts5_memories",
1208 "007_embeddings",
1209 "008_decay_jobs",
1210 "009_decay_supersessions",
1211 "010_pending_mcp_commit",
1212 ]
1213 );
1214 }
1215
1216 #[test]
1217 fn dry_run_plan_fails_closed_on_future_schema_rows() {
1218 let pool = fixture_pool();
1222 pool.execute(
1223 "UPDATE events SET schema_version = 3 WHERE id = 'evt_s2_002';",
1224 [],
1225 )
1226 .expect("mark one row as future schema");
1227
1228 let plan = dry_run_plan(&pool).expect("dry run plan");
1229
1230 assert!(!plan.is_ready());
1231 assert!(
1232 plan.failures.iter().any(|failure| matches!(
1233 failure,
1234 SchemaVersionFailure::Mismatch {
1235 table: "events",
1236 row_id,
1237 expected: 2,
1238 actual: 3,
1239 } if row_id == "evt_s2_002"
1240 )),
1241 "expected future v3 events row mismatch failure, got: {:?}",
1242 plan.failures
1243 );
1244 }
1245
1246 #[test]
1247 fn expand_backfill_skeleton_is_idempotent_and_keeps_v1_versions() {
1248 let pool = fixture_pool();
1249 let imported_at = "2026-05-04T13:00:00Z".parse().unwrap();
1250
1251 let first =
1252 apply_expand_backfill_skeleton(&pool, imported_at).expect("first expand/backfill pass");
1253 let second = apply_expand_backfill_skeleton(&pool, imported_at)
1254 .expect("second expand/backfill pass");
1255
1256 assert!(first.added_columns.is_empty());
1263 assert!(first.created_tables.is_empty());
1264 assert_eq!(first.legacy_event_attestations_backfilled, 2);
1265 assert_eq!(first.episode_summary_spans_backfilled, 1);
1266 assert_eq!(first.memory_summary_spans_backfilled, 1);
1267 assert_eq!(first.context_pack_advisories_backfilled, 1);
1268 assert_eq!(first.memory_salience_defaults_backfilled, 1);
1269
1270 assert!(second.added_columns.is_empty());
1271 assert!(second.created_tables.is_empty());
1272 assert_eq!(second.legacy_event_attestations_backfilled, 0);
1273 assert_eq!(second.episode_summary_spans_backfilled, 0);
1274 assert_eq!(second.memory_summary_spans_backfilled, 0);
1275 assert_eq!(second.context_pack_advisories_backfilled, 0);
1276 assert_eq!(second.memory_salience_defaults_backfilled, 0);
1277
1278 let plan = dry_run_plan(&pool).expect("expanded v1 fixture still has a dry-run plan");
1279 assert!(plan.is_ready(), "unexpected failures: {:?}", plan.failures);
1280 assert_eq!(plan.fixture.observed_row_schema_versions, vec![1]);
1281 assert_eq!(
1282 plan.fixture.applied_migrations,
1283 vec![
1284 "001_init",
1285 "002_authority_timeline",
1286 "003_schema_v2_expand",
1287 "004_principle_promotion_policy_record",
1288 "005_outcome_relation_scope",
1289 "006_fts5_memories",
1290 "007_embeddings",
1291 "008_decay_jobs",
1292 "009_decay_supersessions",
1293 "010_pending_mcp_commit",
1294 ]
1295 );
1296 }
1297
1298 #[test]
1299 fn expand_backfill_skeleton_writes_honest_legacy_markers() {
1300 let pool = fixture_pool();
1301 let imported_at = "2026-05-04T13:00:00Z".parse().unwrap();
1302
1303 apply_expand_backfill_skeleton(&pool, imported_at).expect("expand/backfill");
1304
1305 let source_attestation: serde_json::Value = json_column(
1306 &pool,
1307 "SELECT source_attestation_json FROM events WHERE id = 'evt_s2_001';",
1308 );
1309 assert_eq!(source_attestation["state"], "legacy_unattested");
1310 assert_eq!(
1311 source_attestation["value"]["imported_at"],
1312 "2026-05-04T13:00:00+00:00"
1313 );
1314 assert_eq!(
1315 source_attestation["value"]["original_recorded_at"],
1316 "2026-05-04T12:00:01Z"
1317 );
1318
1319 let episode_spans: serde_json::Value = json_column(
1320 &pool,
1321 "SELECT summary_spans_json FROM episodes WHERE id = 'epi_s2_001';",
1322 );
1323 assert_eq!(episode_spans[0]["byte_start"], 0);
1324 assert_eq!(
1325 episode_spans[0]["byte_end"],
1326 "Small v1 fixture episode.".len()
1327 );
1328 assert_eq!(episode_spans[0]["max_source_authority"], "derived");
1329 assert_eq!(
1330 episode_spans[0]["derived_from_event_ids"],
1331 serde_json::json!(["evt_s2_001", "evt_s2_002"])
1332 );
1333
1334 let memory_spans: serde_json::Value = json_column(
1335 &pool,
1336 "SELECT summary_spans_json FROM memories WHERE id = 'mem_s2_001';",
1337 );
1338 assert_eq!(memory_spans, serde_json::json!([]));
1339
1340 let advisory: serde_json::Value = json_column(
1341 &pool,
1342 "SELECT consumer_advisory_json FROM context_packs WHERE id = 'ctx_s2_001';",
1343 );
1344 assert_eq!(advisory["render_trust"], "untrusted_rendering");
1345 assert_eq!(advisory["execution_trust"], "untrusted_execution");
1346 assert_eq!(
1347 advisory["flags"],
1348 serde_json::json!(["contains_unattested_sources"])
1349 );
1350
1351 let defaults: (u64, u64) = pool
1352 .query_row(
1353 "SELECT cross_session_use_count, validation_epoch
1354 FROM memories WHERE id = 'mem_s2_001';",
1355 [],
1356 |row| Ok((row.get(0)?, row.get(1)?)),
1357 )
1358 .expect("read salience defaults");
1359 assert_eq!(defaults, (0, 0));
1360 }
1361
1362 #[test]
1363 fn expand_backfill_skeleton_fails_closed_before_mutation_on_future_schema_rows() {
1364 let pool = fixture_pool();
1369 pool.execute(
1370 "UPDATE traces SET schema_version = 3 WHERE id = 'trc_s2_001';",
1371 [],
1372 )
1373 .expect("mark trace as future schema");
1374
1375 let err = apply_expand_backfill_skeleton(&pool, "2026-05-04T13:00:00Z".parse().unwrap())
1376 .expect_err("future schema rows must block expand/backfill");
1377
1378 assert!(
1379 err.to_string()
1380 .contains("schema v2 expand/backfill preflight refused future-schema rows"),
1381 "unexpected error: {err}"
1382 );
1383 assert!(has_column(&pool, "events", "source_attestation_json").unwrap());
1387 }
1388
1389 fn json_column(pool: &Pool, sql: &str) -> serde_json::Value {
1390 let raw: String = pool
1391 .query_row(sql, [], |row| row.get(0))
1392 .expect("read json column");
1393 serde_json::from_str(&raw).expect("json column parses")
1394 }
1395}