use chrono::{DateTime, Utc};
use cortex_core::{
validate_summary_spans, ConsumerAdvisory, CrossSessionSalience, OutcomeMemoryRelation,
SourceAttestation, SourceAuthority, SummarySpan,
};
use rusqlite::{params, OptionalExtension};
use crate::{Pool, StoreError, StoreResult};
const SCHEMA_VERSION_TABLES: &[&str] = &["events", "traces"];
const V2_EXPAND_COLUMNS: &[(&str, &str)] = &[
("events", "source_attestation_json"),
("episodes", "summary_spans_json"),
("memories", "summary_spans_json"),
("memories", "cross_session_use_count"),
("memories", "first_used_at"),
("memories", "last_cross_session_use_at"),
("memories", "last_validation_at"),
("memories", "validation_epoch"),
("memories", "blessed_until"),
("context_packs", "consumer_advisory_json"),
];
const V2_EXPAND_TABLES: &[&str] = &["memory_session_uses", "outcome_memory_relations"];
const V2_BACKFILL_REQUIRED_COLUMNS: &[(&str, &str)] = &[
("events", "source_attestation_json"),
("episodes", "summary_spans_json"),
("memories", "summary_spans_json"),
("memories", "cross_session_use_count"),
("memories", "validation_epoch"),
("context_packs", "consumer_advisory_json"),
];
const DEFAULT_SCHEMA_V2_TARGET: u16 = 2;
#[derive(Debug, Clone, Copy)]
struct RequiredTable {
name: &'static str,
columns: &'static [&'static str],
}
const REQUIRED_TABLES: &[RequiredTable] = &[
RequiredTable {
name: "_migrations",
columns: &["name", "applied_at"],
},
RequiredTable {
name: "audit_records",
columns: &[
"id",
"operation",
"target_ref",
"before_hash",
"after_hash",
"reason",
"actor_json",
"source_refs_json",
"created_at",
],
},
RequiredTable {
name: "authority_key_timeline",
columns: &[
"key_id",
"principal_id",
"state",
"effective_at",
"reason",
"audit_ref",
],
},
RequiredTable {
name: "authority_principal_timeline",
columns: &[
"principal_id",
"trust_tier",
"effective_at",
"trust_review_due_at",
"removed_at",
"audit_ref",
],
},
RequiredTable {
name: "context_packs",
columns: &[
"id",
"task",
"pack_json",
"selection_audit",
"created_at",
"consumer_advisory_json",
],
},
RequiredTable {
name: "contradictions",
columns: &[
"id",
"left_ref",
"right_ref",
"contradiction_type",
"status",
"interpretation",
"created_at",
"updated_at",
],
},
RequiredTable {
name: "doctrine",
columns: &[
"id",
"source_principle",
"rule",
"force",
"promotion_reason",
"promoted_by_json",
"created_at",
],
},
RequiredTable {
name: "episodes",
columns: &[
"id",
"trace_id",
"source_events_json",
"summary",
"domains_json",
"entities_json",
"candidate_meaning",
"extracted_by_json",
"confidence",
"status",
"summary_spans_json",
],
},
RequiredTable {
name: "events",
columns: &[
"id",
"schema_version",
"observed_at",
"recorded_at",
"source_json",
"event_type",
"trace_id",
"session_id",
"domain_tags_json",
"payload_json",
"payload_hash",
"prev_event_hash",
"event_hash",
"source_attestation_json",
],
},
RequiredTable {
name: "memories",
columns: &[
"id",
"memory_type",
"status",
"claim",
"source_episodes_json",
"source_events_json",
"domains_json",
"salience_json",
"confidence",
"authority",
"applies_when_json",
"does_not_apply_when_json",
"created_at",
"updated_at",
"summary_spans_json",
"cross_session_use_count",
"first_used_at",
"last_cross_session_use_at",
"last_validation_at",
"validation_epoch",
"blessed_until",
],
},
RequiredTable {
name: "memory_session_uses",
columns: &[
"memory_id",
"session_id",
"first_used_at",
"last_used_at",
"use_count",
],
},
RequiredTable {
name: "outcome_memory_relations",
columns: &[
"outcome_ref",
"memory_id",
"relation",
"recorded_at",
"source_event_id",
"validation_scope",
"validating_principal_id",
"evidence_ref",
],
},
RequiredTable {
name: "principles",
columns: &[
"id",
"statement",
"status",
"supporting_memories_json",
"contradicting_memories_json",
"domains_observed_json",
"applies_when_json",
"does_not_apply_when_json",
"confidence",
"validation",
"brightness",
"created_by_json",
"created_at",
"updated_at",
],
},
RequiredTable {
name: "trace_events",
columns: &["trace_id", "event_id", "ordinal"],
},
RequiredTable {
name: "traces",
columns: &[
"id",
"schema_version",
"opened_at",
"closed_at",
"trace_type",
"status",
],
},
];
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SchemaVersionReport {
pub expected: u16,
pub checked_tables: Vec<&'static str>,
pub failures: Vec<SchemaVersionFailure>,
}
impl SchemaVersionReport {
#[must_use]
pub fn is_ok(&self) -> bool {
self.failures.is_empty()
}
}
#[allow(missing_docs)]
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum SchemaVersionFailure {
MissingTable { table: &'static str },
MissingColumn {
table: &'static str,
column: &'static str,
},
UnknownMigration { name: String },
Mismatch {
table: &'static str,
row_id: String,
expected: u16,
actual: u16,
},
IllegalIntermediateShape {
invariant: &'static str,
detail: String,
},
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct RollbackForkState {
pub schema_version: Option<u16>,
pub fork_marker: Option<ForkMarker>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ForkMarker {
pub marker: String,
}
impl ForkMarker {
#[must_use]
pub fn new(marker: impl Into<String>) -> Self {
Self {
marker: marker.into(),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RollbackForkReport {
pub failures: Vec<RollbackForkFailure>,
}
impl RollbackForkReport {
#[must_use]
pub fn is_ok(&self) -> bool {
self.failures.is_empty()
}
}
#[allow(missing_docs)]
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum RollbackForkFailure {
SchemaVersionRollback { current: u16, candidate: u16 },
ForkMarkerPresent {
location: RollbackForkStateLocation,
marker: String,
},
}
impl RollbackForkFailure {
#[must_use]
pub fn invariant(&self) -> &'static str {
match self {
Self::SchemaVersionRollback { .. } => "rollback.schema_version.not_decreased",
Self::ForkMarkerPresent { .. } => "fork.marker.absent",
}
}
#[must_use]
pub fn detail(&self) -> String {
match self {
Self::SchemaVersionRollback { current, candidate } => format!(
"candidate schema_version {candidate} is lower than current schema_version {current}"
),
Self::ForkMarkerPresent { location, marker } => {
format!("{location} state contains fork marker {marker}")
}
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RollbackForkStateLocation {
Current,
Candidate,
}
impl std::fmt::Display for RollbackForkStateLocation {
fn fmt(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Current => formatter.write_str("current"),
Self::Candidate => formatter.write_str("candidate"),
}
}
}
impl SchemaVersionFailure {
#[must_use]
pub fn invariant(&self) -> String {
match self {
Self::MissingTable { table } => format!("schema_shape.{table}.exists"),
Self::MissingColumn { table, column } => {
format!("schema_shape.{table}.{column}.exists")
}
Self::UnknownMigration { .. } => "schema_migration.known_to_code".into(),
Self::Mismatch { table, .. } => {
format!("schema_version.{table}.matches_code")
}
Self::IllegalIntermediateShape { invariant, .. } => (*invariant).into(),
}
}
#[must_use]
pub fn detail(&self) -> String {
match self {
Self::MissingTable { table } => {
format!("required table {table} is missing")
}
Self::MissingColumn { table, column } => {
format!("table {table} is missing required column {column}")
}
Self::UnknownMigration { name } => {
format!("migration {name} is unknown to this binary")
}
Self::Mismatch {
table,
row_id,
expected,
actual,
} => format!(
"table {table} row {row_id} has schema_version {actual}; expected {expected}"
),
Self::IllegalIntermediateShape { detail, .. } => detail.clone(),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct PreV2BackupRowCounts {
pub events: u64,
pub traces: u64,
pub episodes: u64,
pub memories: u64,
}
pub const SCHEMA_V1_TO_V2_EVENT_BOUNDARY_DELTA: u64 = 1;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PostMigrateCountMismatchFailure {
pub table: &'static str,
pub expected: u64,
pub actual: u64,
pub invariant: &'static str,
}
impl PostMigrateCountMismatchFailure {
#[must_use]
pub fn invariant(&self) -> &'static str {
self.invariant
}
#[must_use]
pub fn detail(&self) -> String {
format!(
"table {table} has {actual} rows after migrate; backup manifest recorded {expected} rows before migrate",
table = self.table,
actual = self.actual,
expected = self.expected,
)
}
}
pub fn verify_post_migrate_row_counts(
pool: &Pool,
pre: &PreV2BackupRowCounts,
) -> StoreResult<Vec<PostMigrateCountMismatchFailure>> {
let observed = PreV2BackupRowCounts {
events: read_table_count(pool, "events")?,
traces: read_table_count(pool, "traces")?,
episodes: read_table_count(pool, "episodes")?,
memories: read_table_count(pool, "memories")?,
};
let mut failures = Vec::new();
let expected_events_after = match pre.events.checked_add(SCHEMA_V1_TO_V2_EVENT_BOUNDARY_DELTA) {
Some(value) => value,
None => {
return Err(StoreError::RowCountCheckedAddOverflow {
table: "events",
pre: pre.events,
delta: SCHEMA_V1_TO_V2_EVENT_BOUNDARY_DELTA,
});
}
};
if observed.events != expected_events_after {
failures.push(PostMigrateCountMismatchFailure {
table: "events",
expected: expected_events_after,
actual: observed.events,
invariant: "schema_v2_post_migrate.row_count.events.matches_pre_plus_boundary",
});
}
for (table, expected, actual, invariant) in [
(
"traces",
pre.traces,
observed.traces,
"schema_v2_post_migrate.row_count.traces.unchanged",
),
(
"episodes",
pre.episodes,
observed.episodes,
"schema_v2_post_migrate.row_count.episodes.unchanged",
),
(
"memories",
pre.memories,
observed.memories,
"schema_v2_post_migrate.row_count.memories.unchanged",
),
] {
if expected != actual {
failures.push(PostMigrateCountMismatchFailure {
table,
expected,
actual,
invariant,
});
}
}
Ok(failures)
}
fn read_table_count(pool: &Pool, table: &'static str) -> StoreResult<u64> {
let sql = format!("SELECT COUNT(*) FROM {table};");
pool.query_row(&sql, [], |row| row.get(0))
.map_err(Into::into)
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub struct PostV2RowPopulation {
pub events_post_v2: u64,
pub traces_post_v2: u64,
}
impl PostV2RowPopulation {
#[must_use]
pub fn is_empty(&self) -> bool {
self.events_post_v2 == 0 && self.traces_post_v2 == 0
}
#[must_use]
pub fn total(&self) -> u64 {
self.events_post_v2.saturating_add(self.traces_post_v2)
}
}
pub fn count_post_v2_rows_outside_boundary(pool: &Pool) -> StoreResult<PostV2RowPopulation> {
const BOUNDARY_PAYLOAD_KIND: &str = "schema_migration.v1_to_v2";
let events_post_v2: u64 = pool.query_row(
"SELECT COUNT(*) FROM events \
WHERE schema_version >= 2 \
AND (json_extract(payload_json, '$.kind') IS NULL \
OR json_extract(payload_json, '$.kind') != ?1);",
params![BOUNDARY_PAYLOAD_KIND],
|row| row.get(0),
)?;
let traces_post_v2: u64 = pool.query_row(
"SELECT COUNT(*) FROM traces WHERE schema_version >= 2;",
[],
|row| row.get(0),
)?;
Ok(PostV2RowPopulation {
events_post_v2,
traces_post_v2,
})
}
#[must_use]
pub fn verify_rollback_fork_refusal(
current: &RollbackForkState,
candidate: &RollbackForkState,
) -> RollbackForkReport {
let mut failures = Vec::new();
if let (Some(current), Some(candidate)) = (current.schema_version, candidate.schema_version) {
if candidate < current {
failures.push(RollbackForkFailure::SchemaVersionRollback { current, candidate });
}
}
if let Some(marker) = ¤t.fork_marker {
failures.push(RollbackForkFailure::ForkMarkerPresent {
location: RollbackForkStateLocation::Current,
marker: marker.marker.clone(),
});
}
if let Some(marker) = &candidate.fork_marker {
failures.push(RollbackForkFailure::ForkMarkerPresent {
location: RollbackForkStateLocation::Candidate,
marker: marker.marker.clone(),
});
}
RollbackForkReport { failures }
}
pub fn verify_schema_version(pool: &Pool, expected: u16) -> StoreResult<SchemaVersionReport> {
let mut checked_tables = Vec::new();
let mut failures = Vec::new();
for table in REQUIRED_TABLES {
checked_tables.push(table.name);
if !has_table(pool, table.name)? {
failures.push(SchemaVersionFailure::MissingTable { table: table.name });
continue;
}
for column in table.columns {
if !has_column(pool, table.name, column)? {
failures.push(SchemaVersionFailure::MissingColumn {
table: table.name,
column,
});
}
}
}
if has_table(pool, "_migrations")? && has_column(pool, "_migrations", "name")? {
let known = crate::migrate::known_migration_names();
let mut stmt = pool.prepare("SELECT name FROM _migrations ORDER BY name;")?;
let rows = stmt.query_map([], |row| row.get::<_, String>(0))?;
for row in rows {
let name = row?;
if !known.iter().any(|known_name| *known_name == name) {
failures.push(SchemaVersionFailure::UnknownMigration { name });
}
}
}
for table in SCHEMA_VERSION_TABLES {
if !has_column(pool, table, "schema_version")? {
continue;
}
let sql = format!(
"SELECT id, schema_version FROM {table} \
WHERE schema_version > ?1 \
ORDER BY id;"
);
let mut stmt = pool.prepare(&sql)?;
let mismatches = stmt.query_map(params![expected], |row| {
Ok(SchemaVersionFailure::Mismatch {
table,
row_id: row.get(0)?,
expected,
actual: row.get(1)?,
})
})?;
for mismatch in mismatches {
failures.push(mismatch?);
}
}
Ok(SchemaVersionReport {
expected,
checked_tables,
failures,
})
}
pub fn verify_schema_v2_expand_shape(pool: &Pool) -> StoreResult<Vec<SchemaVersionFailure>> {
let mut failures = Vec::new();
let mut present_artifacts = 0;
for (table, column) in V2_EXPAND_COLUMNS {
if has_column(pool, table, column)? {
present_artifacts += 1;
}
}
for table in V2_EXPAND_TABLES {
if has_table(pool, table)? {
present_artifacts += 1;
}
}
if present_artifacts == 0 {
return Ok(failures);
}
for (table, column) in V2_EXPAND_COLUMNS {
if !has_column(pool, table, column)? {
failures.push(SchemaVersionFailure::IllegalIntermediateShape {
invariant: "schema_v2_expand_shape.complete",
detail: format!(
"partial schema v2 expand shape is missing column {table}.{column}"
),
});
}
}
for table in V2_EXPAND_TABLES {
if !has_table(pool, table)? {
failures.push(SchemaVersionFailure::IllegalIntermediateShape {
invariant: "schema_v2_expand_shape.complete",
detail: format!("partial schema v2 expand shape is missing table {table}"),
});
}
}
if !failures.is_empty() {
return Ok(failures);
}
for (table, column) in V2_BACKFILL_REQUIRED_COLUMNS {
let null_rows = count_null_column_rows(pool, table, column)?;
if null_rows > 0 {
failures.push(SchemaVersionFailure::IllegalIntermediateShape {
invariant: "schema_v2_expand_backfill.complete",
detail: format!(
"schema v2 expand backfill left {null_rows} {table}.{column} rows unset"
),
});
}
}
Ok(failures)
}
pub fn verify_schema_v2_default_persistence_readiness(
pool: &Pool,
) -> StoreResult<Vec<SchemaVersionFailure>> {
let mut failures = verify_schema_version(pool, DEFAULT_SCHEMA_V2_TARGET)?.failures;
let mut shape_complete = true;
for (table, column) in V2_EXPAND_COLUMNS {
if !has_column(pool, table, column)? {
shape_complete = false;
failures.push(SchemaVersionFailure::IllegalIntermediateShape {
invariant: "schema_v2_default_persistence.s2_9_shape.complete",
detail: format!("default schema v2 persistence is missing column {table}.{column}"),
});
}
}
for table in V2_EXPAND_TABLES {
if !has_table(pool, table)? {
shape_complete = false;
failures.push(SchemaVersionFailure::IllegalIntermediateShape {
invariant: "schema_v2_default_persistence.s2_9_shape.complete",
detail: format!("default schema v2 persistence is missing table {table}"),
});
}
}
if !shape_complete {
return Ok(failures);
}
failures.extend(verify_schema_v2_expand_shape(pool)?);
failures.extend(verify_v2_source_attestations(pool)?);
failures.extend(verify_v2_episode_summary_spans(pool)?);
failures.extend(verify_v2_memory_summary_spans(pool)?);
failures.extend(verify_v2_context_pack_advisories(pool)?);
failures.extend(verify_v2_cross_session_salience(pool)?);
failures.extend(verify_v2_memory_session_uses(pool)?);
failures.extend(verify_v2_outcome_memory_relations(pool)?);
Ok(failures)
}
fn verify_v2_source_attestations(pool: &Pool) -> StoreResult<Vec<SchemaVersionFailure>> {
let mut failures = Vec::new();
let mut stmt = pool.prepare("SELECT id, source_attestation_json FROM events ORDER BY id;")?;
let rows = stmt.query_map([], |row| {
Ok((row.get::<_, String>(0)?, row.get::<_, Option<String>>(1)?))
})?;
for row in rows {
let (id, raw) = row?;
let Some(raw) = raw else {
failures.push(SchemaVersionFailure::IllegalIntermediateShape {
invariant: "schema_v2_default_persistence.source_attestation.present",
detail: format!("events row {id} has unset source_attestation_json"),
});
continue;
};
if let Err(err) = serde_json::from_str::<SourceAttestation>(&raw) {
failures.push(SchemaVersionFailure::IllegalIntermediateShape {
invariant: "schema_v2_default_persistence.source_attestation.valid",
detail: format!(
"events row {id} source_attestation_json is not a valid SourceAttestation: {err}"
),
});
}
}
Ok(failures)
}
fn verify_v2_episode_summary_spans(pool: &Pool) -> StoreResult<Vec<SchemaVersionFailure>> {
let mut failures = Vec::new();
let mut stmt =
pool.prepare("SELECT id, summary, summary_spans_json FROM episodes ORDER BY id;")?;
let rows = stmt.query_map([], |row| {
Ok((
row.get::<_, String>(0)?,
row.get::<_, String>(1)?,
row.get::<_, Option<String>>(2)?,
))
})?;
for row in rows {
let (id, summary, raw) = row?;
let Some(raw) = raw else {
continue;
};
match serde_json::from_str::<Vec<SummarySpan>>(&raw) {
Ok(spans) => {
if let Err(err) =
validate_summary_spans(&summary, &spans, |_| SourceAuthority::Derived)
{
failures.push(SchemaVersionFailure::IllegalIntermediateShape {
invariant: "schema_v2_default_persistence.summary_spans.valid",
detail: format!(
"episodes row {id} summary_spans_json failed {}",
err.invariant()
),
});
}
}
Err(err) => failures.push(SchemaVersionFailure::IllegalIntermediateShape {
invariant: "schema_v2_default_persistence.summary_spans.valid",
detail: format!(
"episodes row {id} summary_spans_json is not valid SummarySpan JSON: {err}"
),
}),
}
}
Ok(failures)
}
fn verify_v2_memory_summary_spans(pool: &Pool) -> StoreResult<Vec<SchemaVersionFailure>> {
let mut failures = Vec::new();
let mut stmt = pool.prepare("SELECT id, summary_spans_json FROM memories ORDER BY id;")?;
let rows = stmt.query_map([], |row| {
Ok((row.get::<_, String>(0)?, row.get::<_, Option<String>>(1)?))
})?;
for row in rows {
let (id, raw) = row?;
let Some(raw) = raw else {
continue;
};
if let Err(err) = serde_json::from_str::<Vec<SummarySpan>>(&raw) {
failures.push(SchemaVersionFailure::IllegalIntermediateShape {
invariant: "schema_v2_default_persistence.summary_spans.valid",
detail: format!(
"memories row {id} summary_spans_json is not valid SummarySpan JSON: {err}"
),
});
}
}
Ok(failures)
}
fn verify_v2_context_pack_advisories(pool: &Pool) -> StoreResult<Vec<SchemaVersionFailure>> {
let mut failures = Vec::new();
let mut stmt =
pool.prepare("SELECT id, consumer_advisory_json FROM context_packs ORDER BY id;")?;
let rows = stmt.query_map([], |row| {
Ok((row.get::<_, String>(0)?, row.get::<_, Option<String>>(1)?))
})?;
for row in rows {
let (id, raw) = row?;
let Some(raw) = raw else {
continue;
};
if let Err(err) = serde_json::from_str::<ConsumerAdvisory>(&raw) {
failures.push(SchemaVersionFailure::IllegalIntermediateShape {
invariant: "schema_v2_default_persistence.context_pack_advisory.valid",
detail: format!(
"context_packs row {id} consumer_advisory_json is not a valid ConsumerAdvisory: {err}"
),
});
}
}
Ok(failures)
}
fn verify_v2_cross_session_salience(pool: &Pool) -> StoreResult<Vec<SchemaVersionFailure>> {
let mut failures = Vec::new();
let mut stmt = pool.prepare(
"SELECT id, cross_session_use_count, first_used_at, last_cross_session_use_at,
last_validation_at, validation_epoch, blessed_until
FROM memories
ORDER BY id;",
)?;
let rows = stmt.query_map([], |row| {
Ok((
row.get::<_, String>(0)?,
row.get::<_, Option<u32>>(1)?,
row.get::<_, Option<String>>(2)?,
row.get::<_, Option<String>>(3)?,
row.get::<_, Option<String>>(4)?,
row.get::<_, Option<u32>>(5)?,
row.get::<_, Option<String>>(6)?,
))
})?;
for row in rows {
let (
id,
cross_session_use_count,
first_used_at,
last_cross_session_use_at,
last_validation_at,
validation_epoch,
blessed_until,
) = row?;
let Some(cross_session_use_count) = cross_session_use_count else {
failures.push(v2_salience_failure(
id,
"cross_session_use_count is missing",
));
continue;
};
let Some(validation_epoch) = validation_epoch else {
failures.push(v2_salience_failure(id, "validation_epoch is missing"));
continue;
};
let first_used_at =
parse_optional_v2_time(&id, "first_used_at", first_used_at, &mut failures);
let last_cross_session_use_at = parse_optional_v2_time(
&id,
"last_cross_session_use_at",
last_cross_session_use_at,
&mut failures,
);
let last_validation_at =
parse_optional_v2_time(&id, "last_validation_at", last_validation_at, &mut failures);
let blessed_until =
parse_optional_v2_time(&id, "blessed_until", blessed_until, &mut failures);
if failures.iter().any(|failure| match failure {
SchemaVersionFailure::IllegalIntermediateShape { detail, .. } => {
detail.starts_with(&format!("memories row {id} "))
}
_ => false,
}) {
continue;
}
let _state = CrossSessionSalience {
cross_session_use_count,
first_used_at,
last_cross_session_use_at,
last_validation_at,
validation_epoch,
blessed_until,
};
}
Ok(failures)
}
fn parse_optional_v2_time(
row_id: &str,
column: &'static str,
raw: Option<String>,
failures: &mut Vec<SchemaVersionFailure>,
) -> Option<DateTime<Utc>> {
raw.and_then(|value| match value.parse::<DateTime<Utc>>() {
Ok(parsed) => Some(parsed),
Err(err) => {
failures.push(v2_salience_failure(
row_id.to_string(),
format!("{column} is not RFC3339 UTC datetime: {err}"),
));
None
}
})
}
fn v2_salience_failure(
row_id: impl Into<String>,
detail: impl Into<String>,
) -> SchemaVersionFailure {
SchemaVersionFailure::IllegalIntermediateShape {
invariant: "schema_v2_default_persistence.cross_session_salience.valid",
detail: format!("memories row {} {}", row_id.into(), detail.into()),
}
}
fn verify_v2_memory_session_uses(pool: &Pool) -> StoreResult<Vec<SchemaVersionFailure>> {
let mut failures = Vec::new();
let mut stmt = pool.prepare(
"SELECT memory_id, session_id, first_used_at, last_used_at, use_count
FROM memory_session_uses
ORDER BY memory_id, session_id;",
)?;
let rows = stmt.query_map([], |row| {
Ok((
row.get::<_, String>(0)?,
row.get::<_, String>(1)?,
row.get::<_, String>(2)?,
row.get::<_, String>(3)?,
row.get::<_, u32>(4)?,
))
})?;
for row in rows {
let (memory_id, session_id, first_used_at, last_used_at, use_count) = row?;
let row_id = format!("{memory_id}/{session_id}");
if memory_id.trim().is_empty() {
failures.push(v2_memory_session_use_failure(&row_id, "memory_id is empty"));
} else if !row_exists(pool, "memories", &memory_id)? {
failures.push(v2_memory_session_use_failure(
&row_id,
"memory_id does not reference an existing memory",
));
}
if session_id.trim().is_empty() {
failures.push(v2_memory_session_use_failure(
&row_id,
"session_id is empty",
));
}
if use_count == 0 {
failures.push(v2_memory_session_use_failure(&row_id, "use_count is zero"));
}
let first_used_at = parse_required_v2_time(
"schema_v2_default_persistence.memory_session_uses.valid",
&row_id,
"first_used_at",
&first_used_at,
&mut failures,
);
let last_used_at = parse_required_v2_time(
"schema_v2_default_persistence.memory_session_uses.valid",
&row_id,
"last_used_at",
&last_used_at,
&mut failures,
);
if let (Some(first), Some(last)) = (first_used_at, last_used_at) {
if last < first {
failures.push(v2_memory_session_use_failure(
&row_id,
"last_used_at is earlier than first_used_at",
));
}
}
}
Ok(failures)
}
fn verify_v2_outcome_memory_relations(pool: &Pool) -> StoreResult<Vec<SchemaVersionFailure>> {
let mut failures = Vec::new();
let mut stmt = pool.prepare(
"SELECT outcome_ref, memory_id, relation, recorded_at, source_event_id,
validation_scope, validating_principal_id, evidence_ref
FROM outcome_memory_relations
ORDER BY outcome_ref, memory_id, relation;",
)?;
let rows = stmt.query_map([], |row| {
Ok((
row.get::<_, String>(0)?,
row.get::<_, String>(1)?,
row.get::<_, String>(2)?,
row.get::<_, String>(3)?,
row.get::<_, Option<String>>(4)?,
row.get::<_, Option<String>>(5)?,
row.get::<_, Option<String>>(6)?,
row.get::<_, Option<String>>(7)?,
))
})?;
for row in rows {
let (
outcome_ref,
memory_id,
relation,
recorded_at,
source_event_id,
validation_scope,
validating_principal_id,
evidence_ref,
) = row?;
let row_id = format!("{outcome_ref}/{memory_id}/{relation}");
if outcome_ref.trim().is_empty() {
failures.push(v2_outcome_memory_relation_failure(
&row_id,
"outcome_ref is empty",
));
}
if memory_id.trim().is_empty() {
failures.push(v2_outcome_memory_relation_failure(
&row_id,
"memory_id is empty",
));
} else if !row_exists(pool, "memories", &memory_id)? {
failures.push(v2_outcome_memory_relation_failure(
&row_id,
"memory_id does not reference an existing memory",
));
}
let parsed_relation =
serde_json::from_value::<OutcomeMemoryRelation>(serde_json::json!(relation));
if let Err(err) = &parsed_relation {
failures.push(v2_outcome_memory_relation_failure(
&row_id,
format!("relation is not a valid OutcomeMemoryRelation: {err}"),
));
}
parse_required_v2_time(
"schema_v2_default_persistence.outcome_memory_relations.valid",
&row_id,
"recorded_at",
&recorded_at,
&mut failures,
);
if let Some(source_event_id) = source_event_id {
if source_event_id.trim().is_empty() {
failures.push(v2_outcome_memory_relation_failure(
&row_id,
"source_event_id is empty",
));
} else if !row_exists(pool, "events", &source_event_id)? {
failures.push(v2_outcome_memory_relation_failure(
&row_id,
"source_event_id does not reference an existing event",
));
}
}
if let Ok(parsed) = parsed_relation {
if parsed.advances_validation() {
if validation_scope.is_none() {
failures.push(v2_outcome_memory_relation_failure(
&row_id,
"validated relation missing validation_scope",
));
}
if validating_principal_id.is_none() {
failures.push(v2_outcome_memory_relation_failure(
&row_id,
"validated relation missing validating_principal_id",
));
}
if evidence_ref.is_none() {
failures.push(v2_outcome_memory_relation_failure(
&row_id,
"validated relation missing evidence_ref",
));
}
} else {
if validation_scope.is_some() {
failures.push(v2_outcome_memory_relation_failure(
&row_id,
"non-validation relation must not carry validation_scope",
));
}
if validating_principal_id.is_some() {
failures.push(v2_outcome_memory_relation_failure(
&row_id,
"non-validation relation must not carry validating_principal_id",
));
}
if evidence_ref.is_some() {
failures.push(v2_outcome_memory_relation_failure(
&row_id,
"non-validation relation must not carry evidence_ref",
));
}
}
}
}
Ok(failures)
}
fn parse_required_v2_time(
invariant: &'static str,
row_id: &str,
column: &'static str,
raw: &str,
failures: &mut Vec<SchemaVersionFailure>,
) -> Option<DateTime<Utc>> {
match raw.parse::<DateTime<Utc>>() {
Ok(parsed) => Some(parsed),
Err(err) => {
failures.push(SchemaVersionFailure::IllegalIntermediateShape {
invariant,
detail: format!("{row_id} {column} is not RFC3339 UTC datetime: {err}"),
});
None
}
}
}
fn v2_memory_session_use_failure(row_id: &str, detail: impl Into<String>) -> SchemaVersionFailure {
SchemaVersionFailure::IllegalIntermediateShape {
invariant: "schema_v2_default_persistence.memory_session_uses.valid",
detail: format!("memory_session_uses row {row_id} {}", detail.into()),
}
}
fn v2_outcome_memory_relation_failure(
row_id: &str,
detail: impl Into<String>,
) -> SchemaVersionFailure {
SchemaVersionFailure::IllegalIntermediateShape {
invariant: "schema_v2_default_persistence.outcome_memory_relations.valid",
detail: format!("outcome_memory_relations row {row_id} {}", detail.into()),
}
}
fn count_null_column_rows(pool: &Pool, table: &str, column: &str) -> StoreResult<u64> {
let sql = format!("SELECT COUNT(*) FROM {table} WHERE {column} IS NULL;");
pool.query_row(&sql, [], |row| row.get(0))
.map_err(Into::into)
}
fn row_exists(pool: &Pool, table: &'static str, id: &str) -> StoreResult<bool> {
let sql = format!("SELECT 1 FROM {table} WHERE id = ?1 LIMIT 1;");
let found = pool.query_row(&sql, params![id], |_| Ok(())).optional()?;
Ok(found.is_some())
}
fn has_table(pool: &Pool, table: &str) -> StoreResult<bool> {
let existing = pool
.query_row(
"SELECT name FROM sqlite_master WHERE type = 'table' AND name = ?1;",
params![table],
|row| row.get::<_, String>(0),
)
.optional()?;
Ok(existing.is_some())
}
fn has_column(pool: &Pool, table: &str, column: &str) -> StoreResult<bool> {
let sql = format!("PRAGMA table_info({table});");
let mut stmt = pool.prepare(&sql)?;
let columns = stmt.query_map([], |row| row.get::<_, String>(1))?;
for found in columns {
if found? == column {
return Ok(true);
}
}
Ok(false)
}
#[cfg(test)]
mod tests {
use rusqlite::Connection;
use super::*;
fn migrated_pool() -> Connection {
let pool = Connection::open_in_memory().expect("open sqlite");
crate::migrate::apply_pending(&pool).expect("apply migrations");
pool
}
#[test]
fn verify_schema_version_passes_for_empty_v1_schema() {
let pool = migrated_pool();
let report = verify_schema_version(&pool, cortex_core::SCHEMA_VERSION)
.expect("verify schema version");
assert!(report.is_ok(), "unexpected failures: {report:?}");
assert_eq!(
report.checked_tables,
REQUIRED_TABLES
.iter()
.map(|table| table.name)
.collect::<Vec<_>>()
);
}
#[test]
fn verify_schema_version_names_mismatched_event_row() {
let pool = migrated_pool();
pool.execute(
"INSERT INTO events (
id, schema_version, observed_at, recorded_at, source_json, event_type,
trace_id, session_id, domain_tags_json, payload_json, payload_hash,
prev_event_hash, event_hash
) VALUES (
'evt_future', 2, '2026-01-01T00:00:00Z', '2026-01-01T00:00:00Z',
'{\"kind\":\"test\"}', 'test.event', NULL, NULL, '[]', '{}',
'payload-hash', NULL, 'event-hash'
);",
[],
)
.expect("insert mismatched event");
let report = verify_schema_version(&pool, 1).expect("verify schema version");
assert_eq!(
report.failures,
vec![SchemaVersionFailure::Mismatch {
table: "events",
row_id: "evt_future".into(),
expected: 1,
actual: 2,
}]
);
assert_eq!(
report.failures[0].invariant(),
"schema_version.events.matches_code"
);
}
#[test]
fn verify_schema_version_reports_missing_required_table() {
let pool = migrated_pool();
pool.execute("DROP TABLE audit_records;", [])
.expect("drop audit_records");
let report = verify_schema_version(&pool, cortex_core::SCHEMA_VERSION)
.expect("verify schema version");
assert!(
report
.failures
.contains(&SchemaVersionFailure::MissingTable {
table: "audit_records"
}),
"failures: {report:?}"
);
}
#[test]
fn verify_schema_version_reports_missing_required_column() {
let pool = migrated_pool();
pool.execute("ALTER TABLE events DROP COLUMN payload_hash;", [])
.expect("drop payload_hash");
let report = verify_schema_version(&pool, cortex_core::SCHEMA_VERSION)
.expect("verify schema version");
assert!(
report
.failures
.contains(&SchemaVersionFailure::MissingColumn {
table: "events",
column: "payload_hash",
}),
"failures: {report:?}"
);
}
#[test]
fn verify_schema_version_reports_unknown_migration() {
let pool = migrated_pool();
pool.execute("INSERT INTO _migrations (name) VALUES ('999_future');", [])
.expect("insert future migration row");
let report = verify_schema_version(&pool, cortex_core::SCHEMA_VERSION)
.expect("verify schema version");
assert_eq!(
report.failures,
vec![SchemaVersionFailure::UnknownMigration {
name: "999_future".into(),
}]
);
assert_eq!(
report.failures[0].invariant(),
"schema_migration.known_to_code"
);
}
#[test]
fn verify_schema_version_v1_binary_refuses_on_v2_migration_marker() {
let pool = migrated_pool();
pool.execute(
"DELETE FROM _migrations WHERE name NOT IN ('001_init', '002_authority_timeline');",
[],
)
.expect("trim post-v1 migration rows to simulate v1-binary baseline");
pool.execute(
"INSERT INTO _migrations (name) VALUES ('003_schema_v2_expand');",
[],
)
.expect("insert v2 cutover migration row");
let simulated_v1_known: &[&str] = &["001_init", "002_authority_timeline"];
let mut stmt = pool
.prepare("SELECT name FROM _migrations ORDER BY name;")
.expect("prepare migrations scan");
let observed_names: Vec<String> = stmt
.query_map([], |row| row.get::<_, String>(0))
.expect("query migrations")
.collect::<rusqlite::Result<Vec<_>>>()
.expect("collect migration names");
let unknown_to_v1: Vec<String> = observed_names
.into_iter()
.filter(|name| !simulated_v1_known.iter().any(|known| *known == name))
.collect();
assert_eq!(
unknown_to_v1,
vec!["003_schema_v2_expand".to_string()],
"v1 binary must see 003_schema_v2_expand as unknown"
);
pool.execute(
"INSERT INTO _migrations (name) VALUES ('003_schema_v2_expand_unknown_to_v1');",
[],
)
.expect("insert simulated-v1-unknown migration row");
let report = verify_schema_version(&pool, 1).expect("verify schema version");
let unknown_failures: Vec<_> = report
.failures
.iter()
.filter_map(|failure| match failure {
SchemaVersionFailure::UnknownMigration { name } => Some(name.clone()),
_ => None,
})
.collect();
assert_eq!(
unknown_failures,
vec!["003_schema_v2_expand_unknown_to_v1".to_string()]
);
assert!(report.failures.iter().any(|failure| matches!(
failure,
SchemaVersionFailure::UnknownMigration { name } if name == "003_schema_v2_expand_unknown_to_v1"
)));
let invariant = report
.failures
.iter()
.find_map(|failure| match failure {
SchemaVersionFailure::UnknownMigration { .. } => Some(failure.invariant()),
_ => None,
})
.expect("unknown-migration failure present");
assert_eq!(invariant, "schema_migration.known_to_code");
}
#[test]
fn verify_schema_version_v2_binary_on_v2_store_emits_no_unknown_migration_failure() {
let pool = migrated_pool();
let report = verify_schema_version(&pool, cortex_core::SCHEMA_VERSION)
.expect("verify schema version");
assert!(
!report
.failures
.iter()
.any(|failure| matches!(failure, SchemaVersionFailure::UnknownMigration { .. })),
"no unknown-migration failure expected: {report:?}"
);
assert!(report.is_ok(), "report must be clean: {report:?}");
}
#[test]
fn verify_schema_version_v2_binary_on_future_v3_store_fails_closed() {
let pool = migrated_pool();
pool.execute(
"INSERT INTO _migrations (name) VALUES ('999_v3_future_unknown_to_v2');",
[],
)
.expect("insert v3 future migration row");
let report = verify_schema_version(&pool, cortex_core::SCHEMA_VERSION)
.expect("verify schema version");
let unknown = report
.failures
.iter()
.find_map(|failure| match failure {
SchemaVersionFailure::UnknownMigration { name } => Some(name.clone()),
_ => None,
})
.expect("unknown-migration failure present");
assert_eq!(unknown, "999_v3_future_unknown_to_v2");
assert!(!report.is_ok(), "v3 marker must fail closed: {report:?}");
}
fn insert_event_row(pool: &Connection, id: &str) {
pool.execute(
"INSERT INTO events (
id, schema_version, observed_at, recorded_at, source_json, event_type,
trace_id, session_id, domain_tags_json, payload_json, payload_hash,
prev_event_hash, event_hash
) VALUES (
?1, 1, '2026-01-01T00:00:00Z', '2026-01-01T00:00:00Z',
'{\"kind\":\"test\"}', 'test.event', NULL, NULL, '[]', '{}',
'payload-hash', NULL, ?2
);",
params![id, format!("event-hash-{id}")],
)
.expect("insert event");
}
#[test]
fn verify_post_migrate_row_counts_passes_with_only_boundary_delta() {
let pool = migrated_pool();
insert_event_row(&pool, "evt_one");
insert_event_row(&pool, "evt_boundary");
let pre = PreV2BackupRowCounts {
events: 1,
traces: 0,
episodes: 0,
memories: 0,
};
let failures = verify_post_migrate_row_counts(&pool, &pre).expect("count compare");
assert!(
failures.is_empty(),
"clean +1 events delta must not fail: {failures:?}"
);
}
#[test]
fn verify_post_migrate_row_counts_rejects_events_growing_beyond_boundary_delta() {
let pool = migrated_pool();
insert_event_row(&pool, "evt_one");
insert_event_row(&pool, "evt_two_boundary");
insert_event_row(&pool, "evt_three_extra");
let pre = PreV2BackupRowCounts {
events: 1,
traces: 0,
episodes: 0,
memories: 0,
};
let failures = verify_post_migrate_row_counts(&pool, &pre).expect("count compare");
assert_eq!(failures.len(), 1, "unexpected failures: {failures:?}");
assert_eq!(failures[0].table, "events");
assert_eq!(failures[0].expected, 2);
assert_eq!(failures[0].actual, 3);
assert_eq!(
failures[0].invariant(),
"schema_v2_post_migrate.row_count.events.matches_pre_plus_boundary"
);
assert!(
failures[0]
.detail()
.contains("table events has 3 rows after migrate"),
"detail: {}",
failures[0].detail()
);
}
#[test]
fn verify_post_migrate_row_counts_rejects_events_count_shrinking() {
let pool = migrated_pool();
let pre = PreV2BackupRowCounts {
events: 5,
traces: 0,
episodes: 0,
memories: 0,
};
let failures = verify_post_migrate_row_counts(&pool, &pre).expect("count compare");
assert_eq!(failures.len(), 1, "unexpected failures: {failures:?}");
assert_eq!(failures[0].expected, 6);
assert_eq!(failures[0].actual, 0);
}
#[test]
fn verify_post_migrate_row_counts_rejects_unchanged_tables_that_drifted() {
let pool = migrated_pool();
insert_event_row(&pool, "evt_one");
insert_event_row(&pool, "evt_boundary");
pool.execute(
"INSERT INTO traces (id, schema_version, opened_at, closed_at, trace_type, status)
VALUES ('trc_new', 1, '2026-01-01T00:00:00Z', NULL, 'fixture', 'open');",
[],
)
.expect("insert trace");
let pre = PreV2BackupRowCounts {
events: 1,
traces: 0,
episodes: 0,
memories: 0,
};
let failures = verify_post_migrate_row_counts(&pool, &pre).expect("count compare");
assert_eq!(failures.len(), 1, "unexpected failures: {failures:?}");
assert_eq!(failures[0].table, "traces");
assert_eq!(failures[0].expected, 0);
assert_eq!(failures[0].actual, 1);
assert_eq!(
failures[0].invariant(),
"schema_v2_post_migrate.row_count.traces.unchanged"
);
}
#[test]
fn verify_post_migrate_row_counts_refuses_pre_events_at_u64_max_overflow() {
let pool = migrated_pool();
let pre = PreV2BackupRowCounts {
events: u64::MAX,
traces: 0,
episodes: 0,
memories: 0,
};
let err = verify_post_migrate_row_counts(&pool, &pre)
.expect_err("overflow must refuse via typed error");
match err {
crate::StoreError::RowCountCheckedAddOverflow { table, pre, delta } => {
assert_eq!(table, "events");
assert_eq!(pre, u64::MAX);
assert_eq!(delta, SCHEMA_V1_TO_V2_EVENT_BOUNDARY_DELTA);
}
other => panic!("unexpected error variant: {other:?}"),
}
let invariant_err = verify_post_migrate_row_counts(&pool, &pre)
.expect_err("overflow must refuse via typed error");
assert_eq!(
invariant_err.invariant(),
Some(crate::VERIFY_ROW_COUNTS_CHECKED_ADD_OVERFLOW_INVARIANT),
);
assert_eq!(
invariant_err.invariant(),
Some("verify.row_counts.checked_add_overflow"),
);
}
#[test]
fn verify_schema_v2_expand_shape_passes_after_default_bundle() {
let pool = migrated_pool();
let failures = verify_schema_v2_expand_shape(&pool).expect("verify v2 expand shape");
assert!(failures.is_empty(), "unexpected failures: {failures:?}");
}
#[test]
fn verify_schema_v2_expand_shape_reports_partial_expand_when_artifacts_removed() {
let pool = migrated_pool();
pool.execute("ALTER TABLE episodes DROP COLUMN summary_spans_json;", [])
.expect("drop episodes.summary_spans_json to simulate partial expand");
pool.execute("DROP TABLE memory_session_uses;", [])
.expect("drop memory_session_uses to simulate partial expand");
let failures = verify_schema_v2_expand_shape(&pool).expect("verify v2 expand shape");
assert!(
failures.iter().any(|failure| matches!(
failure,
SchemaVersionFailure::IllegalIntermediateShape {
invariant: "schema_v2_expand_shape.complete",
detail,
} if detail == "partial schema v2 expand shape is missing column episodes.summary_spans_json"
)),
"failures: {failures:?}"
);
assert!(
failures.iter().any(|failure| matches!(
failure,
SchemaVersionFailure::IllegalIntermediateShape {
invariant: "schema_v2_expand_shape.complete",
detail,
} if detail == "partial schema v2 expand shape is missing table memory_session_uses"
)),
"failures: {failures:?}"
);
}
#[test]
fn verify_schema_v2_expand_shape_reports_unset_required_backfill() {
let pool = migrated_pool();
crate::migrate_v2::apply_expand_backfill_skeleton(
&pool,
"2026-05-04T13:00:00Z".parse().unwrap(),
)
.expect("apply complete v2 expand shape");
pool.execute(
"INSERT INTO events (
id, schema_version, observed_at, recorded_at, source_json, event_type,
trace_id, session_id, domain_tags_json, payload_json, payload_hash,
prev_event_hash, event_hash, source_attestation_json
) VALUES (
'evt_s2_unset_backfill', 1, '2026-05-04T12:00:00Z',
'2026-05-04T12:00:01Z', '{\"kind\":\"test\"}', 'test.event',
NULL, NULL, '[]', '{}', 'payload-hash', NULL, 'event-hash', NULL
);",
[],
)
.expect("insert row with unset v2 backfill metadata");
let failures = verify_schema_v2_expand_shape(&pool).expect("verify v2 expand shape");
assert_eq!(
failures,
vec![SchemaVersionFailure::IllegalIntermediateShape {
invariant: "schema_v2_expand_backfill.complete",
detail:
"schema v2 expand backfill left 1 events.source_attestation_json rows unset"
.into(),
}]
);
}
#[test]
fn count_post_v2_rows_outside_boundary_is_zero_for_empty_store() {
let pool = migrated_pool();
let counts =
count_post_v2_rows_outside_boundary(&pool).expect("count post-v2 rows on empty store");
assert!(counts.is_empty());
assert_eq!(counts.events_post_v2, 0);
assert_eq!(counts.traces_post_v2, 0);
assert_eq!(counts.total(), 0);
}
#[test]
fn count_post_v2_rows_outside_boundary_is_zero_for_pre_v2_rows() {
let pool = migrated_pool();
pool.execute(
"INSERT INTO events (
id, schema_version, observed_at, recorded_at, source_json, event_type,
trace_id, session_id, domain_tags_json, payload_json, payload_hash,
prev_event_hash, event_hash
) VALUES (
'evt_pre_v2_one', 1, '2026-01-01T00:00:00Z', '2026-01-01T00:00:00Z',
'{\"kind\":\"test\"}', 'test.event', NULL, NULL, '[]', '{}',
'payload-hash', NULL, 'event-hash-one'
);",
[],
)
.expect("insert pre-v2 event row");
pool.execute(
"INSERT INTO traces (id, schema_version, opened_at, closed_at, trace_type, status)
VALUES ('trc_pre_v2_one', 1, '2026-01-01T00:00:00Z', NULL, 'fixture', 'open');",
[],
)
.expect("insert pre-v2 trace row");
let counts =
count_post_v2_rows_outside_boundary(&pool).expect("count post-v2 rows on v1 fixture");
assert!(counts.is_empty());
}
#[test]
fn count_post_v2_rows_outside_boundary_skips_the_schema_migration_boundary_row() {
let pool = migrated_pool();
pool.execute(
"INSERT INTO events (
id, schema_version, observed_at, recorded_at, source_json, event_type,
trace_id, session_id, domain_tags_json, payload_json, payload_hash,
prev_event_hash, event_hash
) VALUES (
'evt_boundary', 2, '2026-01-01T00:00:00Z', '2026-01-01T00:00:00Z',
'{\"kind\":\"runtime\"}', 'system.note', NULL, NULL, '[]',
'{\"kind\":\"schema_migration.v1_to_v2\",\"previous_v1_head_hash\":\"prev\",\"migration_script_digest\":\"blake3:digest\",\"fixture_verification_result_hash\":\"blake3:fixture\"}',
'payload-hash', NULL, 'event-hash-boundary'
);",
[],
)
.expect("insert boundary-shape row");
let counts = count_post_v2_rows_outside_boundary(&pool)
.expect("count post-v2 rows after boundary insert");
assert_eq!(counts.events_post_v2, 0);
assert_eq!(counts.traces_post_v2, 0);
}
#[test]
fn count_post_v2_rows_outside_boundary_surfaces_fresh_v2_events_row() {
let pool = migrated_pool();
pool.execute(
"INSERT INTO events (
id, schema_version, observed_at, recorded_at, source_json, event_type,
trace_id, session_id, domain_tags_json, payload_json, payload_hash,
prev_event_hash, event_hash
) VALUES (
'evt_fresh_v2', 2, '2026-01-01T00:00:00Z', '2026-01-01T00:00:00Z',
'{\"kind\":\"runtime\"}', 'tool.result', NULL, NULL, '[]',
'{\"step\":1}',
'payload-hash', NULL, 'event-hash-fresh-v2'
);",
[],
)
.expect("insert fresh-v2 events row");
let counts = count_post_v2_rows_outside_boundary(&pool)
.expect("count post-v2 rows on fresh-v2 fixture");
assert!(!counts.is_empty());
assert_eq!(counts.events_post_v2, 1);
assert_eq!(counts.traces_post_v2, 0);
assert_eq!(counts.total(), 1);
}
#[test]
fn count_post_v2_rows_outside_boundary_surfaces_fresh_v2_traces_row() {
let pool = migrated_pool();
pool.execute(
"INSERT INTO traces (id, schema_version, opened_at, closed_at, trace_type, status)
VALUES ('trc_fresh_v2', 2, '2026-01-01T00:00:00Z', NULL, 'fixture', 'open');",
[],
)
.expect("insert fresh-v2 trace row");
let counts = count_post_v2_rows_outside_boundary(&pool)
.expect("count post-v2 rows on fresh-v2 trace fixture");
assert!(!counts.is_empty());
assert_eq!(counts.events_post_v2, 0);
assert_eq!(counts.traces_post_v2, 1);
}
}