use crate::models::field_names;
use anyhow::{Context, Result};
use rusqlite::{Connection, params};
use std::path::PathBuf;
pub(crate) const SELECT_SCHEMA_VERSION_SQL: &str =
"SELECT COALESCE(MAX(version), 0) FROM schema_version";
const PRAGMA_TABLE_INFO_MEMORIES: &str = "PRAGMA table_info(memories)";
const TRACE_TARGET: &str = "ai_memory::storage::migrations";
pub(super) const SCHEMA: &str = r"
CREATE TABLE IF NOT EXISTS memories (
id TEXT PRIMARY KEY,
tier TEXT NOT NULL,
namespace TEXT NOT NULL DEFAULT 'global',
title TEXT NOT NULL,
content TEXT NOT NULL,
tags TEXT NOT NULL DEFAULT '[]',
priority INTEGER NOT NULL DEFAULT 5,
confidence REAL NOT NULL DEFAULT 1.0,
source TEXT NOT NULL DEFAULT 'api',
access_count INTEGER NOT NULL DEFAULT 0,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
last_accessed_at TEXT,
expires_at TEXT,
metadata TEXT NOT NULL DEFAULT '{}',
-- v0.7.0 Task 1/8 (recursive learning, schema v29) — depth in the
-- substrate-native reflection recursion tree. `0` for caller-minted
-- memories (and any pre-v0.7.0 row); positive for synthesised
-- reflections. Mirrors `models::Memory::reflection_depth`.
reflection_depth INTEGER NOT NULL DEFAULT 0,
-- v0.7.0 L1-1 (typed MemoryKind, schema v30) — first-class kind
-- discriminator. `observation` for all caller-minted memories (and
-- any pre-v30 row); `reflection` for memories minted by
-- `memory_reflect` or the curator reflection pass.
-- Mirrors `models::MemoryKind`.
memory_kind TEXT NOT NULL DEFAULT 'observation',
-- v0.7.0 WT-1-A (schema v36) — substrate-level atomisation foundation.
-- `atomised_into` is NULL on legacy rows; positive integer on rows
-- that have been split into atomic peers (WT-1-B atomisation pass).
-- `atom_of` is NULL on non-atom rows; on atom rows it FK-points back
-- to the parent memory. Pure additive — no existing semantics
-- changes.
atomised_into INTEGER,
atom_of TEXT REFERENCES memories(id),
-- v0.7.0 QW-2 (schema v37) — Persona-as-artifact substrate primitive.
-- `entity_id` is NULL on non-Persona rows; on Persona rows it carries
-- the canonicalised entity descriptor the persona is about (e.g.
-- `user:fate`). `persona_version` is NULL on non-Persona rows; on
-- Persona rows it carries the monotonic per-(entity_id, namespace)
-- generation counter. Pure additive — non-Persona rows keep NULL
-- payloads with no backfill.
entity_id TEXT,
persona_version INTEGER,
-- v0.7.0 Form 4 (schema v38) — fact-provenance closeout. Citations
-- is a JSON-encoded array of `Citation` objects ({uri, accessed_at,
-- hash?, span?}) carrying first-class provenance pointers per
-- memory; legacy rows default to '[]'. `source_uri` is a first-class
-- URI-form pointer to the cited source body (distinct from the
-- existing `source` role-label column); valid schemes are `uri:`
-- (HTTP URL), `doc:` (substrate doc id), `file:` (filesystem path).
-- `source_span` is a JSON-encoded `{start, end}` byte-range into
-- the parent source body, populated by the WT-1-B atomisation
-- writer for each atom (atom-grain span fact-provenance). All
-- three columns are additive on legacy rows. See migration
-- `0032_v07_form4_provenance.sql` for the supporting index.
citations TEXT NOT NULL DEFAULT '[]',
source_uri TEXT,
source_span TEXT,
-- v0.7.0 Form 5 (schema v39, issue #758) — auto-confidence + shadow-mode +
-- calibration tooling closeout. `confidence_source` is a typed
-- discriminator naming the provenance of the `confidence` column value
-- (caller_provided | auto_derived | calibrated | decayed); legacy rows
-- default to 'caller_provided' via the SQL DEFAULT clause.
-- `confidence_signals` is a JSON snapshot of the ConfidenceSignals
-- struct emitted when the value was computed (NULL on legacy rows).
-- `confidence_decayed_at` is an RFC3339 timestamp of the last decay
-- computation (NULL on legacy rows and rows never touched by decay).
confidence_source TEXT NOT NULL DEFAULT 'caller_provided',
confidence_signals TEXT,
confidence_decayed_at TEXT,
-- v0.7.0 polish PERF-8 (schema v42, issue #781) — auto-persona
-- indexed entity-id column. Carries the canonical entity descriptor
-- a memory MENTIONS (extracted at write time from
-- `metadata.entity_id` or a `[entity:X]` title marker) so the
-- auto-persona matcher resolves with
-- `WHERE memory_kind = 'reflection' AND mentioned_entity_id = ?
-- AND namespace = ?` via the `idx_memories_mentioned_entity`
-- partial index instead of the previous full-table `content LIKE
-- '%X%'` scan. Deliberately distinct from the QW-2 `entity_id`
-- column above (which is reserved for Persona-row attribution):
-- PERF-8 reads the OPPOSITE direction (the entity an observation
-- / reflection mentions). Legacy rows default to NULL; the
-- migration ladder backfills from metadata+title at v42 apply
-- time.
mentioned_entity_id TEXT,
-- v0.7.0 (issue #228, schema v44) — E2E memory content encryption
-- at rest. When non-NULL, this BLOB carries the
-- `src/encryption::Envelope::to_bytes()` payload (X25519 ephemeral
-- pubkey + ChaCha20-Poly1305 nonce + AEAD-sealed ciphertext) for
-- the memory's plaintext `content`. The `content` column then
-- carries a placeholder marker rather than plaintext. NULL on
-- every legacy row and on every row written under the default
-- (encryption disabled) configuration. Gated on
-- `[encryption].at_rest = true` or
-- `AI_MEMORY_ENCRYPT_AT_REST=1`.
encrypted_envelope BLOB,
-- v0.7.0 Provenance Gap 1 (issue #884, schema v45) — optimistic-
-- concurrency counter. Every mutation through `storage::update`
-- bumps `version`. MCP `memory_update` accepts
-- `expected_version: Option<i64>` and HTTP `PUT /memories/:id`
-- honors `If-Match: <version>`; both surfaces return a typed
-- CONFLICT envelope when the stored value has drifted from the
-- caller's expected. Legacy rows land at `version = 1` via the
-- SQL DEFAULT clause; subsequent updates bump monotonically.
version BIGINT NOT NULL DEFAULT 1
);
CREATE INDEX IF NOT EXISTS idx_memories_tier ON memories(tier);
CREATE INDEX IF NOT EXISTS idx_memories_namespace ON memories(namespace);
CREATE INDEX IF NOT EXISTS idx_memories_priority ON memories(priority DESC);
CREATE INDEX IF NOT EXISTS idx_memories_expires ON memories(expires_at);
-- #1476 — federation-catchup range scan for `memories_updated_since`
-- (`WHERE updated_at > ?1 ORDER BY updated_at ASC LIMIT`). Also added by
-- migration v55 (file 0046) for upgrading DBs; carried inline here so a
-- fresh bootstrap that applies SCHEMA has it even before the ladder runs.
CREATE INDEX IF NOT EXISTS idx_memories_updated_at ON memories(updated_at);
-- #1579 (A2) — composite list-ordering indexes for the sargable
-- `storage::list` shapes (`ORDER BY priority DESC, updated_at DESC
-- LIMIT ? OFFSET ?`, optionally namespace-filtered). Also added by
-- migration v56 (file 0047) for upgrading DBs; carried inline here so
-- a fresh bootstrap that applies SCHEMA has them even before the
-- ladder runs. The archived_memories sibling index from the same
-- migration is NOT inline because that table is created by the v4
-- ladder arm, not this bootstrap.
CREATE INDEX IF NOT EXISTS idx_memories_list_order ON memories(priority DESC, updated_at DESC);
CREATE INDEX IF NOT EXISTS idx_memories_ns_list_order ON memories(namespace, priority DESC, updated_at DESC);
CREATE UNIQUE INDEX IF NOT EXISTS idx_memories_title_ns ON memories(title, namespace);
-- Partial indexes referencing v36+ columns (`atom_of`, `atomised_into`,
-- `source_uri`, `confidence_source`, `mentioned_entity_id`) and the v41
-- compound shadow-observations index are NOT in this bootstrap SCHEMA
-- (issue #797). They live exclusively in their migration .sql files
-- (`migrations/sqlite/0030_v07_atomisation.sql`,
-- `0032_v07_form4_provenance.sql`,
-- `0033_v07_form5_confidence_calibration.sql`,
-- `0035_v07_shadow_retention.sql`,
-- `0036_v07_auto_persona_entity_id.sql`) and run from the matching
-- `if version < N` arms of `migrate()` AFTER the ALTER TABLE that adds
-- the column.
--
-- `db::open` applies SCHEMA before `migrate`, so any `CREATE INDEX` here
-- that references a v36+ column crashes on a legacy DB whose pre-v36
-- `memories` row leaves the `CREATE TABLE IF NOT EXISTS` as a no-op
-- (the new columns never land). The maintainers caught this for the v37
-- `entity_id` index from the start; v36/v38/v39/v41/v42 were brought
-- under the same discipline in #797.
--
-- Fresh installs are unaffected: the `CREATE TABLE` above lands every
-- v42-era column, then every `if version < N` arm runs its .sql file
-- (idempotent `CREATE INDEX IF NOT EXISTS`) to attach the partial
-- indexes.
CREATE TABLE IF NOT EXISTS confidence_shadow_observations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
memory_id TEXT NOT NULL,
namespace TEXT NOT NULL,
source TEXT NOT NULL DEFAULT 'unknown',
caller_confidence REAL NOT NULL,
derived_confidence REAL NOT NULL,
signals TEXT NOT NULL,
recall_outcome TEXT,
observed_at TEXT NOT NULL,
FOREIGN KEY(memory_id) REFERENCES memories(id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_shadow_obs_namespace
ON confidence_shadow_observations(namespace);
CREATE INDEX IF NOT EXISTS idx_shadow_obs_observed_at
ON confidence_shadow_observations(observed_at);
CREATE INDEX IF NOT EXISTS idx_shadow_obs_memory
ON confidence_shadow_observations(memory_id);
-- `idx_shadow_obs_namespace_source_observed` references the v41
-- `confidence_shadow_observations.source` column and lives in
-- `migrations/sqlite/0035_v07_shadow_retention.sql` (see comment above).
CREATE TABLE IF NOT EXISTS memory_links (
source_id TEXT NOT NULL REFERENCES memories(id) ON DELETE CASCADE,
target_id TEXT NOT NULL REFERENCES memories(id) ON DELETE CASCADE,
relation TEXT NOT NULL DEFAULT 'related_to',
created_at TEXT NOT NULL,
-- v15 temporal trio (added historically via ALTER); included in the
-- bootstrap SCHEMA so test fixtures that stamp `version >= v15`
-- match real-DB shape post-migration ladder.
valid_from TEXT,
valid_until TEXT,
observed_by TEXT,
-- v17-era signature column (Ed25519 attestation, added historically
-- via ALTER).
signature BLOB,
-- v23 attest_level column (added historically via ALTER).
attest_level TEXT,
PRIMARY KEY (source_id, target_id, relation),
-- v33 (v0.7.0 v0.7.1-fold) — SQL-side CHECK constraint promoting the
-- v23 RAISE-trigger validation to a column-level invariant. Closed
-- taxonomy mirrors `crate::validate::VALID_RELATIONS`. v36 (WT-1-A)
-- extended the closed set with `derives_from` for atomisation
-- provenance edges (atom -> parent).
CHECK (relation IN ('related_to', 'supersedes', 'contradicts', 'derived_from', 'reflects_on', 'derives_from'))
);
CREATE VIRTUAL TABLE IF NOT EXISTS memories_fts USING fts5(
title,
content,
tags,
content=memories,
content_rowid=rowid
);
CREATE TRIGGER IF NOT EXISTS memories_ai AFTER INSERT ON memories BEGIN
INSERT INTO memories_fts(rowid, title, content, tags)
VALUES (new.rowid, new.title, new.content, new.tags);
END;
CREATE TRIGGER IF NOT EXISTS memories_ad AFTER DELETE ON memories BEGIN
INSERT INTO memories_fts(memories_fts, rowid, title, content, tags)
VALUES ('delete', old.rowid, old.title, old.content, old.tags);
END;
-- v0.7.0 R5.F5.2 (#1418) — column-scoped to (title, content, tags)
-- so the hot-path UPDATEs that touch `embedding` / `access_count` /
-- `last_accessed_at` / `confidence_decayed_at` / `version` skip the
-- FTS5 sync entirely. `apply_migrations` at the v53 arm performs the
-- swap (DROP + recreate) on legacy DBs that still carry the
-- un-scoped trigger from earlier `SCHEMA` boots.
CREATE TRIGGER IF NOT EXISTS memories_au
AFTER UPDATE OF title, content, tags ON memories BEGIN
INSERT INTO memories_fts(memories_fts, rowid, title, content, tags)
VALUES ('delete', old.rowid, old.title, old.content, old.tags);
INSERT INTO memories_fts(rowid, title, content, tags)
VALUES (new.rowid, new.title, new.content, new.tags);
END;
CREATE TABLE IF NOT EXISTS schema_version (
version INTEGER NOT NULL
);
-- v0.6.4-009 — capability-expansion audit log (NHI guardrails phase 1).
-- Mirrors migrations/sqlite/0014_v064_audit_log.sql so a fresh DB
-- bootstrap that bypasses the migration ladder still ends up with the
-- table present.
CREATE TABLE IF NOT EXISTS audit_log (
id TEXT PRIMARY KEY,
agent_id TEXT,
event_type TEXT NOT NULL,
requested_family TEXT,
granted INTEGER NOT NULL,
attestation_tier TEXT,
timestamp TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_audit_log_agent_id
ON audit_log (agent_id);
CREATE INDEX IF NOT EXISTS idx_audit_log_timestamp
ON audit_log (timestamp);
CREATE INDEX IF NOT EXISTS idx_audit_log_event_type
ON audit_log (event_type);
-- v40 (Cluster-C SEC-3, issue #767) — deferred-audit drainer DLQ.
-- Mirrors `migrations/sqlite/0034_v07_signed_events_dlq.sql` so a
-- fresh DB bootstrap that bypasses the migration ladder still ends
-- up with the table present. See the migration file for the design
-- rationale (failure-split between race-requeue and DLQ-land).
CREATE TABLE IF NOT EXISTS signed_events_dlq (
dlq_id INTEGER PRIMARY KEY AUTOINCREMENT,
id TEXT NOT NULL,
agent_id TEXT NOT NULL,
event_type TEXT NOT NULL,
payload_hash BLOB NOT NULL,
signature BLOB,
attest_level TEXT NOT NULL DEFAULT 'unsigned',
timestamp TEXT NOT NULL,
failure_reason TEXT NOT NULL,
failed_at TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_signed_events_dlq_failed_at
ON signed_events_dlq(failed_at);
CREATE INDEX IF NOT EXISTS idx_signed_events_dlq_agent
ON signed_events_dlq(agent_id);
-- v48 (Track D #933) — federation push DLQ. Mirrors
-- `migrations/sqlite/0041_v07_federation_push_dlq.sql` so a fresh DB
-- bootstrap that bypasses the migration ladder still ends up with the
-- table present. See the migration file for the full design
-- rationale (every per-peer fanout failure inside
-- `broadcast_store_quorum` lands a row; the replay worker re-attempts
-- `post_once` and stamps `replayed_at` on Ack).
CREATE TABLE IF NOT EXISTS federation_push_dlq (
id INTEGER PRIMARY KEY AUTOINCREMENT,
memory_id TEXT NOT NULL,
peer_id TEXT NOT NULL,
payload_json TEXT NOT NULL,
attempt_count INTEGER NOT NULL DEFAULT 1,
last_error TEXT NOT NULL,
failed_at TEXT NOT NULL,
replayed_at TEXT NULL
);
CREATE INDEX IF NOT EXISTS idx_federation_push_dlq_pending_failed_at
ON federation_push_dlq(failed_at)
WHERE replayed_at IS NULL;
CREATE INDEX IF NOT EXISTS idx_federation_push_dlq_peer_pending
ON federation_push_dlq(peer_id)
WHERE replayed_at IS NULL;
CREATE UNIQUE INDEX IF NOT EXISTS idx_federation_push_dlq_pending_uniq
ON federation_push_dlq(memory_id, peer_id)
WHERE replayed_at IS NULL;
";
const CURRENT_SCHEMA_VERSION: i64 = 57;
const PRE_MIGRATION_BACKUP_INFIX: &str = "pre-migration";
const PRE_MIGRATION_BACKUP_EXT: &str = "bak";
#[must_use]
pub fn pre_migration_backup_infix_for_tests() -> &'static str {
PRE_MIGRATION_BACKUP_INFIX
}
fn database_main_file_path(conn: &Connection) -> Option<PathBuf> {
conn.query_row(
"SELECT file FROM pragma_database_list WHERE name = 'main'",
[],
|r| r.get::<_, String>(0),
)
.ok()
.filter(|file| !file.is_empty())
.map(PathBuf::from)
}
fn snapshot_before_migration(
conn: &Connection,
from_version: i64,
to_version: i64,
) -> Result<Option<PathBuf>> {
let Some(db_path) = database_main_file_path(conn) else {
return Ok(None);
};
let file_name = db_path
.file_name()
.map(|n| n.to_string_lossy().into_owned())
.unwrap_or_default();
let parent = db_path.parent().map(PathBuf::from).unwrap_or_default();
let token = chrono::Utc::now().timestamp_nanos_opt().unwrap_or_default();
let snapshot_name = format!(
"{file_name}.{PRE_MIGRATION_BACKUP_INFIX}-v{from_version}-to-v{to_version}-{token}.{PRE_MIGRATION_BACKUP_EXT}"
);
let snapshot_path = parent.join(snapshot_name);
let target = snapshot_path.to_string_lossy().replace('\'', "''");
conn.execute(&format!("VACUUM INTO '{target}'"), [])
.with_context(|| {
format!(
"pre-migration snapshot (v{from_version}→v{to_version}) failed; \
refusing to mutate schema without a recoverable backup"
)
})?;
Ok(Some(snapshot_path))
}
#[must_use]
pub const fn current_schema_version_for_tests() -> i64 {
CURRENT_SCHEMA_VERSION
}
#[must_use]
pub const fn current_schema_version() -> i64 {
CURRENT_SCHEMA_VERSION
}
const MIGRATION_V15_SQLITE: &str =
include_str!("../../migrations/sqlite/0010_v063_hierarchy_kg.sql");
const MIGRATION_V17_SQLITE: &str =
include_str!("../../migrations/sqlite/0012_governance_inherit.sql");
const MIGRATION_V18_SQLITE: &str =
include_str!("../../migrations/sqlite/0011_v0631_data_integrity.sql");
const MIGRATION_V19_SQLITE: &str =
include_str!("../../migrations/sqlite/0013_webhook_event_types.sql");
const MIGRATION_V20_SQLITE: &str = include_str!("../../migrations/sqlite/0014_v064_audit_log.sql");
const MIGRATION_V21_SQLITE: &str =
include_str!("../../migrations/sqlite/0015_v07_pending_action_timeouts.sql");
const MIGRATION_V22_SQLITE: &str = include_str!("../../migrations/sqlite/0016_v07_transcripts.sql");
const MIGRATION_V23_SQLITE: &str =
include_str!("../../migrations/sqlite/0017_v07_link_attest_level.sql");
const MIGRATION_V24_SQLITE: &str =
include_str!("../../migrations/sqlite/0018_v07_transcript_links.sql");
const MIGRATION_V25_SQLITE: &str =
include_str!("../../migrations/sqlite/0019_v07_transcript_lifecycle.sql");
const MIGRATION_V26_SQLITE: &str =
include_str!("../../migrations/sqlite/0020_v07_signed_events.sql");
const MIGRATION_V27_SQLITE: &str =
include_str!("../../migrations/sqlite/0021_v07_a2a_correlation.sql");
const MIGRATION_V28_SQLITE: &str =
include_str!("../../migrations/sqlite/0022_v07_agent_quotas.sql");
const MIGRATION_V30_SQLITE: &str =
include_str!("../../migrations/sqlite/0024_v07_governance_rules.sql");
const MIGRATION_V31_SQLITE: &str = include_str!("../../migrations/sqlite/0025_v07_memory_kind.sql");
const MIGRATION_V32_SQLITE: &str =
include_str!("../../migrations/sqlite/0026_v07_agent_skills.sql");
const MIGRATION_V33_SQLITE: &str =
include_str!("../../migrations/sqlite/0027_v07_memory_links_relation_check.sql");
const MIGRATION_V34_SQLITE: &str =
include_str!("../../migrations/sqlite/0028_v07_signed_events_chain.sql");
const MIGRATION_V35_SQLITE: &str =
include_str!("../../migrations/sqlite/0029_v07_offloaded_blobs.sql");
const MIGRATION_V36_SQLITE: &str = include_str!("../../migrations/sqlite/0030_v07_atomisation.sql");
const MIGRATION_V36_REBUILD_LINKS_SQL: &str = r"
-- WT-1-A — full-table-rebuild adding 'derives_from' to the
-- memory_links.relation CHECK clause. Identical to the v33 rebuild in
-- 0027 except for the CHECK clause; replay-safe because the new table
-- is named memory_links_v36 only for the duration of the rebuild.
CREATE TABLE memory_links_v36 (
source_id TEXT NOT NULL REFERENCES memories(id) ON DELETE CASCADE,
target_id TEXT NOT NULL REFERENCES memories(id) ON DELETE CASCADE,
relation TEXT NOT NULL DEFAULT 'related_to',
created_at TEXT NOT NULL,
valid_from TEXT,
valid_until TEXT,
observed_by TEXT,
signature BLOB,
attest_level TEXT,
PRIMARY KEY (source_id, target_id, relation),
CHECK (relation IN ('related_to', 'supersedes', 'contradicts',
'derived_from', 'reflects_on', 'derives_from'))
);
INSERT INTO memory_links_v36 (
source_id, target_id, relation, created_at,
valid_from, valid_until, observed_by, signature, attest_level
)
SELECT
source_id, target_id, relation, created_at,
valid_from, valid_until, observed_by, signature, attest_level
FROM memory_links;
DROP TRIGGER IF EXISTS memory_links_ck_attest_level_ins;
DROP TRIGGER IF EXISTS memory_links_ck_attest_level_upd;
DROP INDEX IF EXISTS idx_links_temporal_src;
DROP INDEX IF EXISTS idx_links_temporal_tgt;
DROP INDEX IF EXISTS idx_links_relation;
DROP INDEX IF EXISTS idx_memory_links_attest_level;
DROP TABLE memory_links;
ALTER TABLE memory_links_v36 RENAME TO memory_links;
CREATE INDEX IF NOT EXISTS idx_links_temporal_src
ON memory_links (source_id, valid_from, valid_until);
CREATE INDEX IF NOT EXISTS idx_links_temporal_tgt
ON memory_links (target_id, valid_from, valid_until);
CREATE INDEX IF NOT EXISTS idx_links_relation
ON memory_links (relation, valid_from);
CREATE INDEX IF NOT EXISTS idx_memory_links_attest_level
ON memory_links (attest_level, created_at);
CREATE TRIGGER IF NOT EXISTS memory_links_ck_attest_level_ins
BEFORE INSERT ON memory_links
FOR EACH ROW
WHEN NEW.attest_level IS NOT NULL
AND NEW.attest_level NOT IN ('unsigned', 'self_signed', 'peer_attested')
BEGIN
SELECT RAISE(ABORT, 'CHECK constraint failed: memory_links.attest_level must be one of unsigned/self_signed/peer_attested (or NULL for legacy rows)');
END;
CREATE TRIGGER IF NOT EXISTS memory_links_ck_attest_level_upd
BEFORE UPDATE OF attest_level ON memory_links
FOR EACH ROW
WHEN NEW.attest_level IS NOT NULL
AND NEW.attest_level NOT IN ('unsigned', 'self_signed', 'peer_attested')
BEGIN
SELECT RAISE(ABORT, 'CHECK constraint failed: memory_links.attest_level must be one of unsigned/self_signed/peer_attested (or NULL for legacy rows)');
END;
";
const MIGRATION_V37_SQLITE: &str = include_str!("../../migrations/sqlite/0031_v07_persona.sql");
const MIGRATION_V38_SQLITE: &str =
include_str!("../../migrations/sqlite/0032_v07_form4_provenance.sql");
const MIGRATION_V39_SQLITE: &str =
include_str!("../../migrations/sqlite/0033_v07_form5_confidence_calibration.sql");
const MIGRATION_V40_SQLITE: &str =
include_str!("../../migrations/sqlite/0034_v07_signed_events_dlq.sql");
const MIGRATION_V41_SQLITE: &str =
include_str!("../../migrations/sqlite/0035_v07_shadow_retention.sql");
const MIGRATION_V42_SQLITE: &str =
include_str!("../../migrations/sqlite/0036_v07_auto_persona_entity_id.sql");
const MIGRATION_V43_SQLITE: &str =
include_str!("../../migrations/sqlite/0037_v07_persona_signing_atomicity.sql");
const MIGRATION_V45_SQLITE: &str =
include_str!("../../migrations/sqlite/0039_v07_provenance_version.sql");
const MIGRATION_V46_SQLITE: &str =
include_str!("../../migrations/sqlite/0040_v07_source_uri_backfill.sql");
const MIGRATION_V47_SQLITE: &str =
include_str!("../../migrations/sqlite/0038_v07_recall_observations.sql");
const MIGRATION_V48_SQLITE: &str =
include_str!("../../migrations/sqlite/0041_v07_federation_push_dlq.sql");
const MIGRATION_V50_SQLITE: &str =
include_str!("../../migrations/sqlite/0042_v50_per_namespace_quota.sql");
const MIGRATION_V51_SQLITE: &str =
include_str!("../../migrations/sqlite/0043_v51_federation_nonce_cache.sql");
const MIGRATION_V52_SQLITE: &str =
include_str!("../../migrations/sqlite/0044_v52_transcript_line_dedup.sql");
const MIGRATION_V53_SQLITE: &str =
include_str!("../../migrations/sqlite/0045_v53_memories_au_trigger_columns.sql");
const MIGRATION_V55_SQLITE: &str =
include_str!("../../migrations/sqlite/0046_v55_idx_memories_updated_at.sql");
const MIGRATION_V56_SQLITE: &str =
include_str!("../../migrations/sqlite/0047_v56_list_composite_indexes.sql");
#[allow(clippy::too_many_lines)]
pub(crate) fn migrate(conn: &Connection) -> Result<()> {
let version: i64 = conn
.query_row(SELECT_SCHEMA_VERSION_SQL, [], |r| r.get(0))
.unwrap_or(0);
if version >= CURRENT_SCHEMA_VERSION {
return Ok(());
}
if version > 0 {
if let Some(snapshot) = snapshot_before_migration(conn, version, CURRENT_SCHEMA_VERSION)? {
tracing::info!(
from_version = version,
to_version = CURRENT_SCHEMA_VERSION,
snapshot = %snapshot.display(),
"pre-migration database snapshot written"
);
}
}
conn.execute_batch("BEGIN EXCLUSIVE")?;
let result = (|| -> Result<()> {
if version < 2 {
let mut has_confidence = false;
let mut has_source = false;
let mut stmt = conn.prepare(PRAGMA_TABLE_INFO_MEMORIES)?;
let cols = stmt.query_map([], |row| row.get::<_, String>(1))?;
for col in cols {
match col?.as_str() {
field_names::CONFIDENCE => has_confidence = true,
"source" => has_source = true,
_ => {}
}
}
drop(stmt);
if !has_confidence {
conn.execute(
"ALTER TABLE memories ADD COLUMN confidence REAL NOT NULL DEFAULT 1.0",
[],
)?;
}
if !has_source {
conn.execute(
"ALTER TABLE memories ADD COLUMN source TEXT NOT NULL DEFAULT 'api'",
[],
)?;
}
}
if version < 3 {
let mut has_embedding = false;
let mut stmt = conn.prepare(PRAGMA_TABLE_INFO_MEMORIES)?;
let cols = stmt.query_map([], |row| row.get::<_, String>(1))?;
for col in cols {
if col?.as_str() == "embedding" {
has_embedding = true;
}
}
drop(stmt);
if !has_embedding {
conn.execute("ALTER TABLE memories ADD COLUMN embedding BLOB", [])?;
}
}
if version < 4 {
conn.execute_batch(
"CREATE TABLE IF NOT EXISTS archived_memories (
id TEXT PRIMARY KEY,
tier TEXT NOT NULL,
namespace TEXT NOT NULL DEFAULT 'global',
title TEXT NOT NULL,
content TEXT NOT NULL,
tags TEXT NOT NULL DEFAULT '[]',
priority INTEGER NOT NULL DEFAULT 5,
confidence REAL NOT NULL DEFAULT 1.0,
source TEXT NOT NULL DEFAULT 'api',
access_count INTEGER NOT NULL DEFAULT 0,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
last_accessed_at TEXT,
expires_at TEXT,
archived_at TEXT NOT NULL,
archive_reason TEXT NOT NULL DEFAULT 'ttl_expired',
metadata TEXT NOT NULL DEFAULT '{}'
);
CREATE INDEX IF NOT EXISTS idx_archived_namespace ON archived_memories(namespace);
CREATE INDEX IF NOT EXISTS idx_archived_at ON archived_memories(archived_at);",
)?;
}
if version < 5 {
conn.execute_batch(
"CREATE TABLE IF NOT EXISTS namespace_meta (
namespace TEXT PRIMARY KEY,
standard_id TEXT,
updated_at TEXT NOT NULL
);",
)?;
}
if version < 6 {
let has_parent: bool = conn
.prepare("SELECT parent_namespace FROM namespace_meta LIMIT 0")
.is_ok();
if !has_parent {
conn.execute_batch("ALTER TABLE namespace_meta ADD COLUMN parent_namespace TEXT;")?;
}
}
if version < 7 {
let has_metadata: bool = conn
.prepare("SELECT metadata FROM memories LIMIT 0")
.is_ok();
if !has_metadata {
conn.execute(
"ALTER TABLE memories ADD COLUMN metadata TEXT NOT NULL DEFAULT '{}'",
[],
)?;
}
let has_archive_metadata: bool = conn
.prepare("SELECT metadata FROM archived_memories LIMIT 0")
.is_ok();
if !has_archive_metadata {
conn.execute(
"ALTER TABLE archived_memories ADD COLUMN metadata TEXT NOT NULL DEFAULT '{}'",
[],
)?;
}
}
if version < 8 {
conn.execute_batch(
"CREATE TABLE IF NOT EXISTS pending_actions (
id TEXT PRIMARY KEY,
action_type TEXT NOT NULL,
memory_id TEXT,
namespace TEXT NOT NULL,
payload TEXT NOT NULL DEFAULT '{}',
requested_by TEXT NOT NULL,
requested_at TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'pending',
decided_by TEXT,
decided_at TEXT
);
CREATE INDEX IF NOT EXISTS idx_pending_status ON pending_actions(status);
CREATE INDEX IF NOT EXISTS idx_pending_namespace ON pending_actions(namespace);",
)?;
}
if version < 9 {
let has_approvals: bool = conn
.prepare("SELECT approvals FROM pending_actions LIMIT 0")
.is_ok();
if !has_approvals {
conn.execute(
"ALTER TABLE pending_actions ADD COLUMN approvals TEXT NOT NULL DEFAULT '[]'",
[],
)?;
}
}
if version < 10 {
let has_scope_idx: bool = conn
.prepare("SELECT scope_idx FROM memories LIMIT 0")
.is_ok();
if !has_scope_idx {
conn.execute(
"ALTER TABLE memories ADD COLUMN scope_idx TEXT \
GENERATED ALWAYS AS (\
CASE WHEN json_valid(metadata) \
THEN COALESCE(json_extract(metadata, '$.scope'), 'private') \
ELSE 'private' END\
) VIRTUAL",
[],
)?;
}
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_memories_scope_idx ON memories(scope_idx)",
[],
)?;
}
if version < 11 {
conn.execute_batch(
"CREATE TABLE IF NOT EXISTS sync_state (
agent_id TEXT NOT NULL,
peer_id TEXT NOT NULL,
last_seen_at TEXT NOT NULL,
last_pulled_at TEXT NOT NULL,
PRIMARY KEY (agent_id, peer_id)
);
CREATE INDEX IF NOT EXISTS idx_sync_state_agent ON sync_state(agent_id);",
)?;
}
if version < 12 {
let has_last_pushed: bool = conn
.prepare("SELECT last_pushed_at FROM sync_state LIMIT 0")
.is_ok();
if !has_last_pushed {
conn.execute("ALTER TABLE sync_state ADD COLUMN last_pushed_at TEXT", [])?;
}
}
if version < 13 {
conn.execute(
"CREATE TABLE IF NOT EXISTS subscriptions (
id TEXT PRIMARY KEY,
url TEXT NOT NULL,
events TEXT NOT NULL DEFAULT '*',
secret_hash TEXT,
namespace_filter TEXT,
agent_filter TEXT,
created_by TEXT,
created_at TEXT NOT NULL,
last_dispatched_at TEXT,
dispatch_count INTEGER NOT NULL DEFAULT 0,
failure_count INTEGER NOT NULL DEFAULT 0
)",
[],
)?;
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_subscriptions_url ON subscriptions(url)",
[],
)?;
}
if version < 14 {
let has_agent_id_idx: bool = conn
.prepare("SELECT agent_id_idx FROM memories LIMIT 0")
.is_ok();
if !has_agent_id_idx {
conn.execute(
"ALTER TABLE memories ADD COLUMN agent_id_idx TEXT \
GENERATED ALWAYS AS (\
CASE WHEN json_valid(metadata) \
THEN json_extract(metadata, '$.agent_id') \
ELSE NULL END\
) VIRTUAL",
[],
)?;
}
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_memories_agent_id ON memories(agent_id_idx)",
[],
)?;
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_memories_created_at ON memories(created_at)",
[],
)?;
}
if version < 15 {
let has_valid_from = conn
.prepare("SELECT valid_from FROM memory_links LIMIT 0")
.is_ok();
if !has_valid_from {
conn.execute("ALTER TABLE memory_links ADD COLUMN valid_from TEXT", [])?;
}
let has_valid_until = conn
.prepare("SELECT valid_until FROM memory_links LIMIT 0")
.is_ok();
if !has_valid_until {
conn.execute("ALTER TABLE memory_links ADD COLUMN valid_until TEXT", [])?;
}
let has_observed_by = conn
.prepare("SELECT observed_by FROM memory_links LIMIT 0")
.is_ok();
if !has_observed_by {
conn.execute("ALTER TABLE memory_links ADD COLUMN observed_by TEXT", [])?;
}
let has_signature = conn
.prepare("SELECT signature FROM memory_links LIMIT 0")
.is_ok();
if !has_signature {
conn.execute("ALTER TABLE memory_links ADD COLUMN signature BLOB", [])?;
}
conn.execute_batch(MIGRATION_V15_SQLITE)?;
}
if version < 16 {
}
if version < 17 {
conn.execute_batch(MIGRATION_V17_SQLITE)?;
}
if version < 18 {
let has_embedding_dim = conn
.prepare("SELECT embedding_dim FROM memories LIMIT 0")
.is_ok();
if !has_embedding_dim {
conn.execute("ALTER TABLE memories ADD COLUMN embedding_dim INTEGER", [])?;
}
let has_archive_embedding = conn
.prepare("SELECT embedding FROM archived_memories LIMIT 0")
.is_ok();
if !has_archive_embedding {
conn.execute(
"ALTER TABLE archived_memories ADD COLUMN embedding BLOB",
[],
)?;
}
let has_archive_embedding_dim = conn
.prepare("SELECT embedding_dim FROM archived_memories LIMIT 0")
.is_ok();
if !has_archive_embedding_dim {
conn.execute(
"ALTER TABLE archived_memories ADD COLUMN embedding_dim INTEGER",
[],
)?;
}
let has_original_tier = conn
.prepare("SELECT original_tier FROM archived_memories LIMIT 0")
.is_ok();
if !has_original_tier {
conn.execute(
"ALTER TABLE archived_memories ADD COLUMN original_tier TEXT",
[],
)?;
}
let has_original_expires_at = conn
.prepare("SELECT original_expires_at FROM archived_memories LIMIT 0")
.is_ok();
if !has_original_expires_at {
conn.execute(
"ALTER TABLE archived_memories ADD COLUMN original_expires_at TEXT",
[],
)?;
}
conn.execute_batch(MIGRATION_V18_SQLITE)?;
}
if version < 19 {
let has_event_types = conn
.prepare("SELECT event_types FROM subscriptions LIMIT 0")
.is_ok();
if !has_event_types {
conn.execute("ALTER TABLE subscriptions ADD COLUMN event_types TEXT", [])?;
}
conn.execute_batch(MIGRATION_V19_SQLITE)?;
}
if version < 20 {
conn.execute_batch(MIGRATION_V20_SQLITE)?;
}
if version < 21 {
let has_timeout: bool = conn
.prepare("SELECT default_timeout_seconds FROM pending_actions LIMIT 0")
.is_ok();
if !has_timeout {
conn.execute(
"ALTER TABLE pending_actions ADD COLUMN default_timeout_seconds INTEGER",
[],
)?;
}
let has_expired_at: bool = conn
.prepare("SELECT expired_at FROM pending_actions LIMIT 0")
.is_ok();
if !has_expired_at {
conn.execute("ALTER TABLE pending_actions ADD COLUMN expired_at TEXT", [])?;
}
conn.execute_batch(MIGRATION_V21_SQLITE)?;
}
if version < 22 {
conn.execute_batch(MIGRATION_V22_SQLITE)?;
}
if version < 23 {
let has_attest_level = conn
.prepare("SELECT attest_level FROM memory_links LIMIT 0")
.is_ok();
if !has_attest_level {
conn.execute("ALTER TABLE memory_links ADD COLUMN attest_level TEXT", [])?;
}
conn.execute_batch(MIGRATION_V23_SQLITE)?;
}
if version < 24 {
conn.execute_batch(MIGRATION_V24_SQLITE)?;
}
if version < 25 {
let has_archived_at = conn
.prepare("SELECT archived_at FROM memory_transcripts LIMIT 0")
.is_ok();
if !has_archived_at {
conn.execute(
"ALTER TABLE memory_transcripts ADD COLUMN archived_at TEXT",
[],
)?;
}
conn.execute_batch(MIGRATION_V25_SQLITE)?;
}
if version < 26 {
conn.execute_batch(MIGRATION_V26_SQLITE)?;
}
if version < 27 {
let table_exists: bool = conn
.query_row(
"SELECT EXISTS(SELECT 1 FROM sqlite_master \
WHERE type = 'table' AND name = 'subscription_events')",
[],
|r| r.get(0),
)
.unwrap_or(false);
if table_exists {
let has_correlation = conn
.prepare("SELECT correlation_id FROM subscription_events LIMIT 0")
.is_ok();
if !has_correlation {
conn.execute(
"ALTER TABLE subscription_events ADD COLUMN correlation_id TEXT NOT NULL DEFAULT ''",
[],
)?;
}
}
conn.execute_batch(MIGRATION_V27_SQLITE)?;
}
if version < 28 {
conn.execute_batch(MIGRATION_V28_SQLITE)?;
}
if version < 29 {
let has_reflection_depth = conn
.prepare("SELECT reflection_depth FROM memories LIMIT 0")
.is_ok();
if !has_reflection_depth {
conn.execute(
"ALTER TABLE memories ADD COLUMN reflection_depth INTEGER NOT NULL DEFAULT 0",
[],
)?;
}
}
if version < 30 {
conn.execute_batch(MIGRATION_V30_SQLITE)?;
}
if version < 31 {
let has_memory_kind = conn
.prepare("SELECT memory_kind FROM memories LIMIT 0")
.is_ok();
if !has_memory_kind {
conn.execute(
"ALTER TABLE memories ADD COLUMN memory_kind TEXT NOT NULL DEFAULT 'observation'",
[],
)?;
}
conn.execute_batch(MIGRATION_V31_SQLITE)?;
}
if version < 32 {
conn.execute_batch(MIGRATION_V32_SQLITE)?;
}
if version < 33 {
conn.execute_batch(MIGRATION_V33_SQLITE)?;
}
if version < 34 {
let signed_events_exists: bool = conn
.query_row(
"SELECT EXISTS(SELECT 1 FROM sqlite_master \
WHERE type = 'table' AND name = 'signed_events')",
[],
|r| r.get(0),
)
.unwrap_or(false);
if signed_events_exists {
let has_prev_hash = conn
.prepare("SELECT prev_hash FROM signed_events LIMIT 0")
.is_ok();
if !has_prev_hash {
conn.execute("ALTER TABLE signed_events ADD COLUMN prev_hash BLOB", [])?;
}
let has_sequence = conn
.prepare("SELECT sequence FROM signed_events LIMIT 0")
.is_ok();
if !has_sequence {
conn.execute("ALTER TABLE signed_events ADD COLUMN sequence INTEGER", [])?;
}
migrate_v34_backfill_chain(conn)?;
conn.execute_batch(MIGRATION_V34_SQLITE)?;
}
}
if version < 35 {
conn.execute_batch(MIGRATION_V35_SQLITE)?;
}
if version < 36 {
let mut has_atomised_into = false;
let mut has_atom_of = false;
let mut stmt = conn.prepare(PRAGMA_TABLE_INFO_MEMORIES)?;
let cols = stmt.query_map([], |row| row.get::<_, String>(1))?;
for col in cols {
match col?.as_str() {
"atomised_into" => has_atomised_into = true,
"atom_of" => has_atom_of = true,
_ => {}
}
}
drop(stmt);
if !has_atomised_into {
conn.execute("ALTER TABLE memories ADD COLUMN atomised_into INTEGER", [])?;
}
if !has_atom_of {
conn.execute(
"ALTER TABLE memories ADD COLUMN atom_of TEXT REFERENCES memories(id)",
[],
)?;
}
let memory_links_exists: bool = conn
.query_row(
"SELECT EXISTS(SELECT 1 FROM sqlite_master \
WHERE type = 'table' AND name = 'memory_links')",
[],
|r| r.get(0),
)
.unwrap_or(false);
if memory_links_exists {
let existing_sql: String = conn
.query_row(
"SELECT sql FROM sqlite_master \
WHERE type = 'table' AND name = 'memory_links'",
[],
|r| r.get(0),
)
.unwrap_or_default();
let needs_rebuild =
!existing_sql.contains(crate::models::MemoryLinkRelation::DerivesFrom.as_str());
if needs_rebuild {
conn.execute_batch(MIGRATION_V36_REBUILD_LINKS_SQL)?;
}
}
conn.execute_batch(MIGRATION_V36_SQLITE)?;
}
if version < 37 {
let mut has_entity_id = false;
let mut has_persona_version = false;
let mut stmt = conn.prepare(PRAGMA_TABLE_INFO_MEMORIES)?;
let cols = stmt.query_map([], |row| row.get::<_, String>(1))?;
for col in cols {
match col?.as_str() {
"entity_id" => has_entity_id = true,
field_names::PERSONA_VERSION => has_persona_version = true,
_ => {}
}
}
drop(stmt);
if !has_entity_id {
conn.execute("ALTER TABLE memories ADD COLUMN entity_id TEXT", [])?;
}
if !has_persona_version {
conn.execute(
"ALTER TABLE memories ADD COLUMN persona_version INTEGER",
[],
)?;
}
conn.execute_batch(MIGRATION_V37_SQLITE)?;
}
if version < 38 {
let mut has_citations = false;
let mut has_source_uri = false;
let mut has_source_span = false;
let mut stmt = conn.prepare(PRAGMA_TABLE_INFO_MEMORIES)?;
let cols = stmt.query_map([], |row| row.get::<_, String>(1))?;
for col in cols {
match col?.as_str() {
"citations" => has_citations = true,
field_names::SOURCE_URI => has_source_uri = true,
field_names::SOURCE_SPAN => has_source_span = true,
_ => {}
}
}
drop(stmt);
if !has_citations {
conn.execute(
"ALTER TABLE memories ADD COLUMN citations TEXT NOT NULL DEFAULT '[]'",
[],
)?;
}
if !has_source_uri {
conn.execute("ALTER TABLE memories ADD COLUMN source_uri TEXT", [])?;
}
if !has_source_span {
conn.execute("ALTER TABLE memories ADD COLUMN source_span TEXT", [])?;
}
conn.execute_batch(MIGRATION_V38_SQLITE)?;
}
if version < 39 {
let mut has_source = false;
let mut has_signals = false;
let mut has_decayed_at = false;
let mut stmt = conn.prepare(PRAGMA_TABLE_INFO_MEMORIES)?;
let cols = stmt.query_map([], |row| row.get::<_, String>(1))?;
for col in cols {
match col?.as_str() {
field_names::CONFIDENCE_SOURCE => has_source = true,
field_names::CONFIDENCE_SIGNALS => has_signals = true,
field_names::CONFIDENCE_DECAYED_AT => has_decayed_at = true,
_ => {}
}
}
drop(stmt);
if !has_source {
conn.execute(
"ALTER TABLE memories ADD COLUMN confidence_source TEXT NOT NULL \
DEFAULT 'caller_provided'",
[],
)?;
}
if !has_signals {
conn.execute(
"ALTER TABLE memories ADD COLUMN confidence_signals TEXT",
[],
)?;
}
if !has_decayed_at {
conn.execute(
"ALTER TABLE memories ADD COLUMN confidence_decayed_at TEXT",
[],
)?;
}
conn.execute_batch(MIGRATION_V39_SQLITE)?;
}
if version < 40 {
conn.execute_batch(MIGRATION_V40_SQLITE)?;
}
if version < 41 {
let mut has_source = false;
let mut stmt = conn.prepare("PRAGMA table_info(confidence_shadow_observations)")?;
let cols = stmt.query_map([], |row| row.get::<_, String>(1))?;
for col in cols {
if col?.as_str() == "source" {
has_source = true;
}
}
drop(stmt);
if !has_source {
conn.execute(
"ALTER TABLE confidence_shadow_observations \
ADD COLUMN source TEXT NOT NULL DEFAULT 'unknown'",
[],
)?;
conn.execute(
"UPDATE confidence_shadow_observations \
SET source = COALESCE( \
(SELECT m.source FROM memories m \
WHERE m.id = confidence_shadow_observations.memory_id), \
'unknown')",
[],
)?;
}
conn.execute_batch(MIGRATION_V41_SQLITE)?;
}
if version < 42 {
let mut has_mentioned = false;
let mut stmt = conn.prepare(PRAGMA_TABLE_INFO_MEMORIES)?;
let cols = stmt.query_map([], |row| row.get::<_, String>(1))?;
for col in cols {
if col?.as_str() == "mentioned_entity_id" {
has_mentioned = true;
}
}
drop(stmt);
if !has_mentioned {
conn.execute(
"ALTER TABLE memories ADD COLUMN mentioned_entity_id TEXT",
[],
)?;
conn.execute(
"UPDATE memories
SET mentioned_entity_id = json_extract(metadata, '$.entity_id')
WHERE memory_kind = 'reflection'
AND mentioned_entity_id IS NULL
AND json_valid(metadata) = 1
AND json_extract(metadata, '$.entity_id') IS NOT NULL
AND length(json_extract(metadata, '$.entity_id')) > 0",
[],
)?;
conn.execute(
"UPDATE memories
SET mentioned_entity_id = trim(substr(
title,
instr(title, '[entity:') + length('[entity:'),
instr(substr(title, instr(title, '[entity:') + length('[entity:')), ']') - 1
))
WHERE memory_kind = 'reflection'
AND mentioned_entity_id IS NULL
AND instr(title, '[entity:') > 0
AND instr(substr(title, instr(title, '[entity:') + length('[entity:')), ']') > 1",
[],
)?;
}
conn.execute_batch(MIGRATION_V42_SQLITE)?;
}
if version < 43 {
conn.execute_batch(MIGRATION_V43_SQLITE)?;
}
if version < 44 {
let mut has_envelope = false;
let mut stmt = conn.prepare(PRAGMA_TABLE_INFO_MEMORIES)?;
let cols = stmt.query_map([], |row| row.get::<_, String>(1))?;
for col in cols {
if col?.as_str() == "encrypted_envelope" {
has_envelope = true;
}
}
drop(stmt);
if !has_envelope {
conn.execute(
"ALTER TABLE memories ADD COLUMN encrypted_envelope BLOB",
[],
)?;
}
}
if version < 45 {
let mut has_version_col = false;
let mut stmt = conn.prepare(PRAGMA_TABLE_INFO_MEMORIES)?;
let cols = stmt.query_map([], |row| row.get::<_, String>(1))?;
for col in cols {
if col?.as_str() == "version" {
has_version_col = true;
}
}
drop(stmt);
if !has_version_col {
conn.execute(
"ALTER TABLE memories ADD COLUMN version BIGINT NOT NULL DEFAULT 1",
[],
)?;
}
conn.execute_batch(MIGRATION_V45_SQLITE)?;
}
if version < 46 {
conn.execute_batch(MIGRATION_V46_SQLITE)?;
}
if version < 47 {
conn.execute_batch(MIGRATION_V47_SQLITE)?;
}
if version < 48 {
conn.execute_batch(MIGRATION_V48_SQLITE)?;
}
if version < 49 {
let existing: std::collections::HashSet<String> = conn
.prepare("PRAGMA table_info(archived_memories)")?
.query_map([], |r| r.get::<_, String>(1))?
.collect::<rusqlite::Result<_>>()?;
if existing.is_empty() {
tracing::debug!(
target: TRACE_TARGET,
"v49: archived_memories table does not exist (test fixture or \
deployment-without-archive); skipping column extension"
);
} else {
for (col, ddl) in &[
(field_names::REFLECTION_DEPTH, "INTEGER"),
("atomised_into", "INTEGER"),
("atom_of", "TEXT"),
(field_names::MEMORY_KIND, "TEXT"),
("entity_id", "TEXT"),
(field_names::PERSONA_VERSION, "INTEGER"),
("citations", "TEXT"),
(field_names::SOURCE_URI, "TEXT"),
(field_names::SOURCE_SPAN, "TEXT"),
(field_names::CONFIDENCE_SOURCE, "TEXT"),
(field_names::CONFIDENCE_SIGNALS, "TEXT"),
(field_names::CONFIDENCE_DECAYED_AT, "TEXT"),
("mentioned_entity_id", "TEXT"),
("version", "INTEGER"),
] {
if !existing.contains(*col) {
conn.execute(
&format!("ALTER TABLE archived_memories ADD COLUMN {col} {ddl}"),
[],
)?;
}
}
}
}
if version < 50 {
let cols: std::collections::HashSet<String> = conn
.prepare("PRAGMA table_info(agent_quotas)")?
.query_map([], |r| r.get::<_, String>(1))?
.collect::<rusqlite::Result<_>>()?;
if cols.is_empty() {
tracing::debug!(
target: TRACE_TARGET,
"v50: agent_quotas table does not exist (test fixture or \
deployment-without-quotas); skipping shadow-table swap"
);
} else if !cols.contains("namespace") {
conn.execute_batch(MIGRATION_V50_SQLITE)?;
}
}
if version < 51 {
conn.execute_batch(MIGRATION_V51_SQLITE)?;
}
if version < 52 {
conn.execute_batch(MIGRATION_V52_SQLITE)?;
}
if version < 53 {
let memories_fts_exists: bool = conn
.query_row(
"SELECT EXISTS(SELECT 1 FROM sqlite_master \
WHERE type = 'table' AND name = 'memories_fts')",
[],
|r| r.get(0),
)
.unwrap_or(false);
if memories_fts_exists {
conn.execute_batch(MIGRATION_V53_SQLITE)?;
}
}
if version < 54 {
for tier in [
crate::models::Tier::Mid,
crate::models::Tier::Short,
crate::models::Tier::Long,
] {
if let Some(ttl_secs) = tier.default_ttl_secs() {
conn.execute(
"UPDATE memories \
SET expires_at = strftime('%Y-%m-%dT%H:%M:%S+00:00', created_at, ?1) \
WHERE expires_at IS NULL AND tier = ?2",
params![format!("+{ttl_secs} seconds"), tier.as_str()],
)?;
}
}
}
if version < 55 {
conn.execute_batch(MIGRATION_V55_SQLITE)?;
}
if version < 56 {
conn.execute_batch(MIGRATION_V56_SQLITE)?;
let has_archive_table: bool = conn
.prepare("PRAGMA table_info(archived_memories)")?
.query_map([], |r| r.get::<_, String>(1))?
.next()
.is_some();
if has_archive_table {
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_archived_ns_archived_at \
ON archived_memories (namespace, archived_at DESC)",
[],
)?;
} else {
tracing::debug!(
target: TRACE_TARGET,
"v56: archived_memories table does not exist (test fixture); \
skipping idx_archived_ns_archived_at"
);
}
}
conn.execute("DELETE FROM schema_version", [])?;
conn.execute(
"INSERT INTO schema_version (version) VALUES (?1)",
params![CURRENT_SCHEMA_VERSION],
)?;
Ok(())
})();
match result {
Ok(()) => {
conn.execute_batch(super::connection::SQL_COMMIT)?;
Ok(())
}
Err(e) => {
let _ = conn.execute_batch(super::connection::SQL_ROLLBACK);
Err(e)
}
}
}
pub fn migrate_v34_backfill_chain(conn: &Connection) -> Result<()> {
use crate::signed_events::{SignedEvent, ZERO_HASH, canonical_chain_bytes};
use sha2::{Digest, Sha256};
let mut stmt = conn.prepare(
"SELECT rowid, id, agent_id, event_type, payload_hash, signature, attest_level, \
timestamp \
FROM signed_events \
WHERE sequence IS NULL \
ORDER BY rowid ASC",
)?;
let pending: Vec<(i64, SignedEvent)> = stmt
.query_map([], |row| {
let rowid: i64 = row.get(0)?;
Ok((
rowid,
SignedEvent {
id: row.get(1)?,
agent_id: row.get(2)?,
event_type: row.get(3)?,
payload_hash: row.get(4)?,
signature: row.get(5)?,
attest_level: row.get(6)?,
timestamp: row.get(7)?,
prev_hash: Vec::new(),
sequence: 0,
},
))
})?
.collect::<rusqlite::Result<Vec<_>>>()?;
drop(stmt);
if pending.is_empty() {
return Ok(());
}
let mut next_seq: i64 = conn.query_row(
"SELECT COALESCE(MAX(sequence), 0) FROM signed_events",
[],
|r| r.get(0),
)?;
let mut prev_hash: [u8; 32] = ZERO_HASH;
if next_seq > 0 {
let head: Option<SignedEvent> = conn
.query_row(
"SELECT id, agent_id, event_type, payload_hash, signature, attest_level, \
timestamp, COALESCE(sequence, 0) \
FROM signed_events \
WHERE sequence = ?1",
params![next_seq],
|row| {
Ok(SignedEvent {
id: row.get(0)?,
agent_id: row.get(1)?,
event_type: row.get(2)?,
payload_hash: row.get(3)?,
signature: row.get(4)?,
attest_level: row.get(5)?,
timestamp: row.get(6)?,
sequence: row.get(7)?,
prev_hash: Vec::new(),
})
},
)
.map(Some)
.or_else(|e| match e {
rusqlite::Error::QueryReturnedNoRows => Ok(None),
other => Err(other),
})?;
if let Some(h) = head {
let canon = canonical_chain_bytes(&h);
let mut hasher = Sha256::new();
hasher.update(&canon);
prev_hash.copy_from_slice(&hasher.finalize());
}
}
for (rowid, mut event) in pending {
next_seq += 1;
event.sequence = next_seq;
conn.execute(
"UPDATE signed_events SET prev_hash = ?1, sequence = ?2 WHERE rowid = ?3",
params![prev_hash.to_vec(), next_seq, rowid],
)?;
let canon = canonical_chain_bytes(&event);
let mut hasher = Sha256::new();
hasher.update(&canon);
prev_hash.copy_from_slice(&hasher.finalize());
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use rusqlite::Connection;
const V54_DISPATCH_TRIGGER: i64 = 54;
fn fresh_db_via_migrate() -> Connection {
let conn = Connection::open_in_memory().expect("in-memory db");
conn.execute_batch(SCHEMA).expect("apply SCHEMA");
conn.execute("DELETE FROM schema_version", [])
.expect("clear schema_version");
conn.execute("INSERT INTO schema_version (version) VALUES (0)", [])
.expect("seed v0");
super::migrate(&conn).expect("migrate from v0 succeeds");
conn
}
fn current_version(conn: &Connection) -> i64 {
conn.query_row("SELECT MAX(version) FROM schema_version", [], |r| r.get(0))
.unwrap_or(0)
}
#[test]
fn migrate_brings_v0_to_current() {
let conn = fresh_db_via_migrate();
assert_eq!(current_version(&conn), CURRENT_SCHEMA_VERSION);
}
#[test]
fn current_schema_version_matches_module_docstring() {
let source = include_str!("migrations.rs");
let marker = "current value: ";
let pos = source
.find(marker)
.expect("module docstring must contain `current value: N` marker");
let tail = &source[pos + marker.len()..];
let end = tail
.find(')')
.expect("docstring marker must close with `)`");
let parsed: i64 = tail[..end]
.trim()
.parse()
.expect("docstring `current value:` must be a parseable integer");
assert_eq!(
parsed, CURRENT_SCHEMA_VERSION,
"module docstring advertises {parsed}; bump the docstring when \
CURRENT_SCHEMA_VERSION changes (current = {})",
CURRENT_SCHEMA_VERSION
);
}
#[test]
fn current_schema_version_for_tests_matches_constant() {
assert_eq!(
super::current_schema_version_for_tests(),
CURRENT_SCHEMA_VERSION
);
}
#[test]
fn in_memory_db_has_no_snapshot_file_path() {
let conn = Connection::open_in_memory().expect("in-memory db");
assert!(
super::database_main_file_path(&conn).is_none(),
"in-memory DB must have no resolvable on-disk file path"
);
}
#[test]
fn snapshot_before_migration_is_noop_for_in_memory_db() {
let conn = Connection::open_in_memory().expect("in-memory db");
let from = CURRENT_SCHEMA_VERSION - 1;
let made = super::snapshot_before_migration(&conn, from, CURRENT_SCHEMA_VERSION)
.expect("snapshot helper must not error on in-memory db");
assert!(
made.is_none(),
"in-memory DB upgrade must not produce a snapshot file"
);
}
#[test]
fn pre_migration_backup_infix_accessor_is_stable_and_nonempty() {
let infix = super::pre_migration_backup_infix_for_tests();
assert!(!infix.is_empty(), "snapshot infix must be non-empty");
assert_eq!(infix, super::PRE_MIGRATION_BACKUP_INFIX);
}
#[test]
fn migrate_is_idempotent_when_run_twice() {
let conn = fresh_db_via_migrate();
let v_before = current_version(&conn);
super::migrate(&conn).expect("second migrate is no-op");
let v_after = current_version(&conn);
assert_eq!(v_before, v_after);
assert_eq!(v_after, CURRENT_SCHEMA_VERSION);
}
#[test]
fn migrate_from_current_minus_one_runs_only_terminal_arm() {
let conn = Connection::open_in_memory().expect("in-memory db");
conn.execute_batch(SCHEMA).expect("apply SCHEMA");
conn.execute("DELETE FROM schema_version", []).unwrap();
conn.execute(
"INSERT INTO schema_version (version) VALUES (?1)",
params![CURRENT_SCHEMA_VERSION - 1],
)
.unwrap();
super::migrate(&conn).expect("terminal-arm migrate ok");
assert_eq!(current_version(&conn), CURRENT_SCHEMA_VERSION);
}
#[test]
fn migrate_v54_backfills_null_expiry_on_non_long_rows() {
let conn = Connection::open_in_memory().expect("in-memory db");
conn.execute_batch(SCHEMA).expect("apply SCHEMA");
let created = "2026-01-01T00:00:00+00:00";
for (id, tier) in [("mid1", "mid"), ("short1", "short"), ("long1", "long")] {
conn.execute(
"INSERT INTO memories (id, tier, namespace, title, content, created_at, updated_at, expires_at) \
VALUES (?1, ?2, 'ns', ?1, 'c', ?3, ?3, NULL)",
params![id, tier, created],
)
.unwrap();
}
conn.execute("DELETE FROM schema_version", []).unwrap();
conn.execute(
"INSERT INTO schema_version (version) VALUES (?1)",
params![V54_DISPATCH_TRIGGER - 1],
)
.unwrap();
super::migrate(&conn).expect("v54 backfill arm runs");
assert_eq!(current_version(&conn), CURRENT_SCHEMA_VERSION);
let expiry = |id: &str| -> Option<String> {
conn.query_row(
"SELECT expires_at FROM memories WHERE id = ?1",
params![id],
|r| r.get(0),
)
.unwrap()
};
let gap = |id: &str| -> i64 {
let exp = expiry(id).expect("non-long row must be backfilled");
let base = chrono::DateTime::parse_from_rfc3339(created).unwrap();
let got = chrono::DateTime::parse_from_rfc3339(&exp).unwrap();
(got - base).num_seconds()
};
assert_eq!(gap("mid1"), crate::SECS_PER_WEEK, "mid backfill = +1w");
assert_eq!(
gap("short1"),
6 * crate::SECS_PER_HOUR,
"short backfill = +6h"
);
assert!(expiry("long1").is_none(), "long has no TTL — stays NULL");
}
#[test]
fn migrate_v54_is_idempotent_on_already_stamped_rows() {
let conn = Connection::open_in_memory().expect("in-memory db");
conn.execute_batch(SCHEMA).expect("apply SCHEMA");
let created = "2026-01-01T00:00:00+00:00";
conn.execute(
"INSERT INTO memories (id, tier, namespace, title, content, created_at, updated_at, expires_at) \
VALUES ('m', 'mid', 'ns', 't', 'c', ?1, ?1, NULL)",
params![created],
)
.unwrap();
conn.execute("DELETE FROM schema_version", []).unwrap();
conn.execute(
"INSERT INTO schema_version (version) VALUES (?1)",
params![V54_DISPATCH_TRIGGER - 1],
)
.unwrap();
super::migrate(&conn).expect("first v54 pass");
let first: String = conn
.query_row("SELECT expires_at FROM memories WHERE id='m'", [], |r| {
r.get(0)
})
.unwrap();
conn.execute("DELETE FROM schema_version", []).unwrap();
conn.execute(
"INSERT INTO schema_version (version) VALUES (?1)",
params![V54_DISPATCH_TRIGGER - 1],
)
.unwrap();
super::migrate(&conn).expect("second v54 pass");
let second: String = conn
.query_row("SELECT expires_at FROM memories WHERE id='m'", [], |r| {
r.get(0)
})
.unwrap();
assert_eq!(
first, second,
"idempotent: already-stamped expiry must not move"
);
}
#[test]
fn v55_arm_creates_updated_at_index_and_is_idempotent() {
const PRIOR_VERSION: i64 = 54;
const UPDATED_AT_INDEX: &str = "idx_memories_updated_at";
let conn = Connection::open_in_memory().expect("in-memory db");
conn.execute_batch(SCHEMA).expect("apply SCHEMA");
conn.execute(&format!("DROP INDEX IF EXISTS {UPDATED_AT_INDEX}"), [])
.expect("drop index to simulate a pre-index DB");
assert!(
!index_exists(&conn, UPDATED_AT_INDEX),
"precondition: index removed to simulate a pre-index DB"
);
let seed_prior_version = |conn: &Connection| {
conn.execute("DELETE FROM schema_version", []).unwrap();
conn.execute(
"INSERT INTO schema_version (version) VALUES (?1)",
params![PRIOR_VERSION],
)
.unwrap();
};
seed_prior_version(&conn);
super::migrate(&conn).expect("first index-migration pass");
assert!(
index_exists(&conn, UPDATED_AT_INDEX),
"v55 arm must create {UPDATED_AT_INDEX}"
);
assert_eq!(current_version(&conn), CURRENT_SCHEMA_VERSION);
seed_prior_version(&conn);
super::migrate(&conn).expect("second pass is replay-safe");
assert!(index_exists(&conn, UPDATED_AT_INDEX));
assert_eq!(current_version(&conn), CURRENT_SCHEMA_VERSION);
}
#[test]
fn latest_arm_creates_list_composite_indexes_and_is_idempotent() {
const PRIOR_VERSION: i64 = 55;
const LIST_INDEXES: [&str; 2] = ["idx_memories_list_order", "idx_memories_ns_list_order"];
let conn = Connection::open_in_memory().expect("in-memory db");
conn.execute_batch(SCHEMA).expect("apply SCHEMA");
for idx in LIST_INDEXES {
conn.execute(&format!("DROP INDEX IF EXISTS {idx}"), [])
.expect("drop index to simulate a pre-index DB");
assert!(
!index_exists(&conn, idx),
"precondition: {idx} removed to simulate a pre-index DB"
);
}
let seed_prior_version = |conn: &Connection| {
conn.execute("DELETE FROM schema_version", []).unwrap();
conn.execute(
"INSERT INTO schema_version (version) VALUES (?1)",
params![PRIOR_VERSION],
)
.unwrap();
};
seed_prior_version(&conn);
super::migrate(&conn).expect("first index-migration pass");
for idx in LIST_INDEXES {
assert!(index_exists(&conn, idx), "latest arm must create {idx}");
}
assert_eq!(current_version(&conn), CURRENT_SCHEMA_VERSION);
seed_prior_version(&conn);
super::migrate(&conn).expect("second pass is replay-safe");
for idx in LIST_INDEXES {
assert!(index_exists(&conn, idx));
}
assert_eq!(current_version(&conn), CURRENT_SCHEMA_VERSION);
}
#[test]
fn migrate_no_op_when_version_already_current() {
let conn = Connection::open_in_memory().expect("in-memory db");
conn.execute_batch(SCHEMA).expect("apply SCHEMA");
conn.execute("DELETE FROM schema_version", []).unwrap();
conn.execute(
"INSERT INTO schema_version (version) VALUES (?1)",
params![CURRENT_SCHEMA_VERSION + 5],
)
.unwrap();
super::migrate(&conn).expect("ahead-of-current is a no-op");
let v = current_version(&conn);
assert_eq!(
v,
CURRENT_SCHEMA_VERSION + 5,
"fast-path must not overwrite a newer version stamp"
);
}
#[test]
fn migrate_v2_backfills_confidence_default_on_existing_row() {
let conn = Connection::open_in_memory().expect("in-memory db");
conn.execute_batch(SCHEMA).unwrap();
conn.execute("DELETE FROM schema_version", []).unwrap();
conn.execute("INSERT INTO schema_version VALUES (1)", [])
.unwrap();
conn.execute(
"INSERT INTO memories (id, tier, namespace, title, content, created_at, updated_at) \
VALUES ('m1', 'short', 'ns', 't', 'c', '2024-01-01T00:00:00Z', '2024-01-01T00:00:00Z')",
[],
)
.unwrap();
super::migrate(&conn).expect("migrate succeeds");
let conf: f64 = conn
.query_row("SELECT confidence FROM memories WHERE id='m1'", [], |r| {
r.get(0)
})
.unwrap();
let source: String = conn
.query_row("SELECT source FROM memories WHERE id='m1'", [], |r| {
r.get(0)
})
.unwrap();
assert!((conf - 1.0).abs() < f64::EPSILON, "default confidence");
assert_eq!(source, "api", "default source");
}
#[test]
fn migrate_v29_backfills_reflection_depth_default() {
let conn = Connection::open_in_memory().expect("in-memory db");
conn.execute_batch(SCHEMA).unwrap();
conn.execute("DELETE FROM schema_version", []).unwrap();
conn.execute("INSERT INTO schema_version VALUES (28)", [])
.unwrap();
conn.execute(
"INSERT INTO memories (id, tier, namespace, title, content, created_at, updated_at) \
VALUES ('mref', 'mid', 'ns', 't', 'c', '2024-01-01T00:00:00Z', '2024-01-01T00:00:00Z')",
[],
)
.unwrap();
super::migrate(&conn).expect("migrate to v29 ok");
let depth: i64 = conn
.query_row(
"SELECT reflection_depth FROM memories WHERE id='mref'",
[],
|r| r.get(0),
)
.unwrap();
assert_eq!(depth, 0, "reflection_depth default must be 0");
}
#[test]
fn migrate_wraps_in_begin_exclusive_transaction() {
let conn = fresh_db_via_migrate();
let v: i64 = conn
.query_row("SELECT version FROM schema_version", [], |r| r.get(0))
.unwrap();
assert_eq!(v, CURRENT_SCHEMA_VERSION);
}
#[test]
fn migrate_idempotent_replay_keeps_schema_stable() {
let conn = fresh_db_via_migrate();
for _ in 0..5 {
super::migrate(&conn).expect("idempotent");
}
let n: i64 = conn
.query_row("SELECT COUNT(*) FROM schema_version", [], |r| r.get(0))
.unwrap();
assert_eq!(n, 1, "schema_version row count must remain 1 after replay");
let v: i64 = conn
.query_row("SELECT version FROM schema_version", [], |r| r.get(0))
.unwrap();
assert_eq!(v, CURRENT_SCHEMA_VERSION);
}
#[test]
fn migrate_v2_adds_columns_only_when_absent() {
let conn = Connection::open_in_memory().expect("in-memory db");
conn.execute_batch(SCHEMA).unwrap();
conn.execute("DELETE FROM schema_version", []).unwrap();
conn.execute("INSERT INTO schema_version VALUES (1)", [])
.unwrap();
super::migrate(&conn).expect("idempotent v2");
assert_eq!(current_version(&conn), CURRENT_SCHEMA_VERSION);
}
const LEGACY_V1_SCHEMA: &str = r"
CREATE TABLE IF NOT EXISTS memories (
id TEXT PRIMARY KEY,
tier TEXT NOT NULL,
namespace TEXT NOT NULL DEFAULT 'global',
title TEXT NOT NULL,
content TEXT NOT NULL,
tags TEXT NOT NULL DEFAULT '[]',
priority INTEGER NOT NULL DEFAULT 5,
access_count INTEGER NOT NULL DEFAULT 0,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
last_accessed_at TEXT,
expires_at TEXT
);
CREATE TABLE IF NOT EXISTS memory_links (
source_id TEXT NOT NULL,
target_id TEXT NOT NULL,
relation TEXT NOT NULL DEFAULT 'related_to',
created_at TEXT NOT NULL,
PRIMARY KEY (source_id, target_id, relation)
);
CREATE TABLE IF NOT EXISTS schema_version (
version INTEGER NOT NULL
);
INSERT INTO schema_version (version) VALUES (0);
";
fn replay_from_v1() -> Connection {
let conn = Connection::open_in_memory().expect("in-memory db");
conn.execute_batch(LEGACY_V1_SCHEMA)
.expect("apply legacy v1 schema");
conn.execute(
"INSERT INTO memories (id, tier, namespace, title, content, created_at, updated_at) \
VALUES ('legacy', 'short', 'ns', 't', 'c', \
'2024-01-01T00:00:00Z', '2024-01-01T00:00:00Z')",
[],
)
.unwrap();
super::migrate(&conn).expect("walk every migrate arm from v0");
conn
}
fn column_exists(conn: &Connection, table: &str, column: &str) -> bool {
let sql = format!("SELECT {column} FROM {table} LIMIT 0");
conn.prepare(&sql).is_ok()
}
fn table_exists(conn: &Connection, table: &str) -> bool {
conn.query_row(
"SELECT 1 FROM sqlite_master WHERE type='table' AND name=?1",
params![table],
|row| row.get::<_, i64>(0),
)
.is_ok()
}
fn index_exists(conn: &Connection, index: &str) -> bool {
conn.query_row(
"SELECT 1 FROM sqlite_master WHERE type='index' AND name=?1",
params![index],
|row| row.get::<_, i64>(0),
)
.is_ok()
}
#[test]
fn historical_replay_from_v1_reaches_current_schema() {
let conn = replay_from_v1();
assert_eq!(current_version(&conn), CURRENT_SCHEMA_VERSION);
assert!(column_exists(&conn, "memories", "confidence"));
assert!(column_exists(&conn, "memories", "source"));
assert!(column_exists(&conn, "memories", "embedding"));
assert!(table_exists(&conn, "archived_memories"));
assert!(table_exists(&conn, "namespace_meta"));
assert!(column_exists(&conn, "namespace_meta", "parent_namespace"));
assert!(column_exists(&conn, "memories", "metadata"));
assert!(column_exists(&conn, "archived_memories", "metadata"));
assert!(table_exists(&conn, "pending_actions"));
assert!(column_exists(&conn, "pending_actions", "approvals"));
assert!(column_exists(&conn, "memories", "scope_idx"));
assert!(index_exists(&conn, "idx_memories_scope_idx"));
assert!(table_exists(&conn, "sync_state"));
assert!(column_exists(&conn, "sync_state", "last_pushed_at"));
assert!(table_exists(&conn, "subscriptions"));
assert!(column_exists(&conn, "memories", "agent_id_idx"));
assert!(index_exists(&conn, "idx_memories_agent_id"));
assert!(index_exists(&conn, "idx_memories_created_at"));
assert!(column_exists(&conn, "memory_links", "valid_from"));
assert!(column_exists(&conn, "memory_links", "valid_until"));
assert!(column_exists(&conn, "memory_links", "observed_by"));
assert!(column_exists(&conn, "memory_links", "signature"));
assert!(table_exists(&conn, "entity_aliases"));
assert!(column_exists(&conn, "memories", "embedding_dim"));
assert!(column_exists(&conn, "archived_memories", "embedding"));
assert!(column_exists(&conn, "archived_memories", "embedding_dim"));
assert!(column_exists(&conn, "archived_memories", "original_tier"));
assert!(column_exists(
&conn,
"archived_memories",
"original_expires_at"
));
assert!(column_exists(&conn, "subscriptions", "event_types"));
assert!(table_exists(&conn, "audit_log"));
assert!(column_exists(
&conn,
"pending_actions",
"default_timeout_seconds"
));
assert!(column_exists(&conn, "pending_actions", "expired_at"));
assert!(table_exists(&conn, "memory_transcripts"));
assert!(column_exists(&conn, "memory_links", "attest_level"));
assert!(table_exists(&conn, "memory_transcript_links"));
assert!(column_exists(&conn, "memory_transcripts", "archived_at"));
assert!(table_exists(&conn, "signed_events"));
assert!(table_exists(&conn, "subscription_events"));
assert!(table_exists(&conn, "subscription_dlq"));
assert!(column_exists(
&conn,
"subscription_events",
"correlation_id"
));
assert!(table_exists(&conn, "agent_quotas"));
assert!(column_exists(&conn, "memories", "reflection_depth"));
assert!(column_exists(&conn, "memories", "atomised_into"));
assert!(column_exists(&conn, "memories", "atom_of"));
assert!(index_exists(&conn, "idx_memories_atom_of"));
assert!(index_exists(&conn, "idx_memories_atomised_into"));
assert!(index_exists(&conn, "idx_memories_updated_at"));
assert!(index_exists(&conn, "idx_memories_list_order"));
assert!(index_exists(&conn, "idx_memories_ns_list_order"));
assert!(index_exists(&conn, "idx_archived_ns_archived_at"));
}
#[test]
fn historical_replay_backfills_v2_defaults_on_legacy_row() {
let conn = replay_from_v1();
let conf: f64 = conn
.query_row(
"SELECT confidence FROM memories WHERE id='legacy'",
[],
|r| r.get(0),
)
.unwrap();
let source: String = conn
.query_row("SELECT source FROM memories WHERE id='legacy'", [], |r| {
r.get(0)
})
.unwrap();
assert!((conf - 1.0).abs() < f64::EPSILON);
assert_eq!(source, "api");
}
#[test]
fn historical_replay_backfills_v7_metadata_default_on_legacy_row() {
let conn = replay_from_v1();
let meta: String = conn
.query_row("SELECT metadata FROM memories WHERE id='legacy'", [], |r| {
r.get(0)
})
.unwrap();
assert_eq!(meta, "{}");
}
#[test]
fn historical_replay_backfills_v29_reflection_depth_default() {
let conn = replay_from_v1();
let depth: i64 = conn
.query_row(
"SELECT reflection_depth FROM memories WHERE id='legacy'",
[],
|r| r.get(0),
)
.unwrap();
assert_eq!(depth, 0);
}
#[test]
fn historical_replay_v15_backfills_valid_from_to_memories_created_at() {
let conn = Connection::open_in_memory().expect("in-memory db");
conn.execute_batch(LEGACY_V1_SCHEMA)
.expect("apply legacy v1 schema");
conn.execute(
"INSERT INTO memories (id, tier, namespace, title, content, created_at, updated_at) \
VALUES ('m_src', 'short', 'ns', 't1', 'c1', \
'2024-06-01T12:34:56Z', '2024-06-01T12:34:56Z')",
[],
)
.unwrap();
conn.execute(
"INSERT INTO memories (id, tier, namespace, title, content, created_at, updated_at) \
VALUES ('m_tgt', 'short', 'ns', 't2', 'c2', \
'2024-06-01T12:34:56Z', '2024-06-01T12:34:56Z')",
[],
)
.unwrap();
conn.execute(
"INSERT INTO memory_links (source_id, target_id, relation, created_at) \
VALUES ('m_src', 'm_tgt', 'related_to', '2024-06-01T12:34:56Z')",
[],
)
.unwrap();
super::migrate(&conn).expect("migrate from v0 with link");
let valid_from: Option<String> = conn
.query_row(
"SELECT valid_from FROM memory_links \
WHERE source_id='m_src' AND target_id='m_tgt'",
[],
|r| r.get(0),
)
.unwrap();
assert_eq!(
valid_from.as_deref(),
Some("2024-06-01T12:34:56Z"),
"v15 backfill must seed valid_from to source created_at"
);
}
#[test]
fn historical_replay_v18_backfills_embedding_dim_from_blob_length() {
let conn = Connection::open_in_memory().expect("in-memory db");
conn.execute_batch(LEGACY_V1_SCHEMA)
.expect("apply legacy v1 schema");
super::migrate(&conn).expect("first migrate to current");
conn.execute("DELETE FROM schema_version", []).unwrap();
conn.execute("INSERT INTO schema_version VALUES (17)", [])
.unwrap();
let embedding = vec![0u8; 8]; conn.execute(
"INSERT INTO memories \
(id, tier, namespace, title, content, created_at, updated_at, embedding, embedding_dim) \
VALUES ('m18', 'short', 'ns', 't', 'c', \
'2024-01-01T00:00:00Z', '2024-01-01T00:00:00Z', ?1, NULL)",
params![embedding],
)
.unwrap();
super::migrate(&conn).expect("migrate v17->v29 (idempotent on ALTERs)");
let dim: Option<i64> = conn
.query_row(
"SELECT embedding_dim FROM memories WHERE id='m18'",
[],
|r| r.get(0),
)
.unwrap();
assert_eq!(dim, Some(2), "v18 backfill must set embedding_dim = len/4");
}
#[test]
fn historical_replay_v27_creates_dlq_table() {
let conn = replay_from_v1();
assert!(table_exists(&conn, "subscription_dlq"));
assert!(index_exists(&conn, "idx_subscription_dlq_subscription"));
assert!(index_exists(&conn, "idx_subscription_dlq_correlation"));
assert!(index_exists(&conn, "idx_subscription_events_correlation"));
}
#[test]
fn historical_replay_v27_alter_runs_before_index_on_existing_subscription_events_table() {
let conn = Connection::open_in_memory().expect("in-memory db");
conn.execute_batch(SCHEMA).unwrap();
conn.execute("DELETE FROM schema_version", []).unwrap();
conn.execute("DROP TABLE IF EXISTS subscription_events", [])
.unwrap();
conn.execute_batch(
"CREATE TABLE subscription_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
subscription_id TEXT NOT NULL,
event_type TEXT NOT NULL,
payload TEXT NOT NULL,
delivered_at TEXT NOT NULL,
delivery_status TEXT NOT NULL DEFAULT 'pending'
);",
)
.unwrap();
conn.execute("INSERT INTO schema_version VALUES (26)", [])
.unwrap();
super::migrate(&conn).expect("v27 migration on hand-rolled v26 table must succeed");
assert!(
column_exists(&conn, "subscription_events", "correlation_id"),
"ALTER must add correlation_id before the SQL file's CREATE INDEX runs"
);
assert!(index_exists(&conn, "idx_subscription_events_correlation"));
assert_eq!(current_version(&conn), CURRENT_SCHEMA_VERSION);
}
#[test]
fn historical_replay_idempotent_re_run_holds_steady() {
let conn = replay_from_v1();
let before = current_version(&conn);
super::migrate(&conn).expect("idempotent re-run");
let after = current_version(&conn);
assert_eq!(before, after);
assert_eq!(after, CURRENT_SCHEMA_VERSION);
let n: i64 = conn
.query_row("SELECT COUNT(*) FROM schema_version", [], |r| r.get(0))
.unwrap();
assert_eq!(n, 1);
}
#[test]
fn historical_replay_v7_alters_pre_existing_archived_memories_without_metadata() {
let conn = Connection::open_in_memory().expect("in-memory db");
conn.execute_batch(LEGACY_V1_SCHEMA)
.expect("apply legacy v1 schema");
conn.execute_batch(
"CREATE TABLE archived_memories (
id TEXT PRIMARY KEY,
tier TEXT NOT NULL,
namespace TEXT NOT NULL,
title TEXT NOT NULL,
content TEXT NOT NULL,
tags TEXT NOT NULL,
priority INTEGER NOT NULL,
confidence REAL NOT NULL,
source TEXT NOT NULL,
access_count INTEGER NOT NULL,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
last_accessed_at TEXT,
expires_at TEXT,
archived_at TEXT NOT NULL,
archive_reason TEXT NOT NULL DEFAULT 'ttl_expired'
);",
)
.unwrap();
super::migrate(&conn).expect("migrate v0->v29 with stale archived_memories shape");
assert!(column_exists(&conn, "archived_memories", "metadata"));
assert_eq!(current_version(&conn), CURRENT_SCHEMA_VERSION);
}
#[test]
fn historical_replay_v18_alters_pre_existing_archived_memories_without_embedding() {
let conn = replay_from_v1();
assert!(column_exists(&conn, "archived_memories", "embedding"));
assert!(column_exists(&conn, "archived_memories", "embedding_dim"));
assert!(column_exists(&conn, "archived_memories", "original_tier"));
assert!(column_exists(
&conn,
"archived_memories",
"original_expires_at"
));
}
#[test]
fn historical_replay_v9_alters_pending_actions_missing_approvals() {
let conn = Connection::open_in_memory().expect("in-memory db");
conn.execute_batch(LEGACY_V1_SCHEMA)
.expect("apply legacy v1 schema");
conn.execute_batch(
"CREATE TABLE pending_actions (
id TEXT PRIMARY KEY,
action_type TEXT NOT NULL,
memory_id TEXT,
namespace TEXT NOT NULL,
payload TEXT NOT NULL DEFAULT '{}',
requested_by TEXT NOT NULL,
requested_at TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'pending',
decided_by TEXT,
decided_at TEXT,
approvals TEXT NOT NULL DEFAULT '[]'
);",
)
.unwrap();
super::migrate(&conn).expect("migrate v0->v29 with pre-existing approvals");
assert!(column_exists(&conn, "pending_actions", "approvals"));
assert_eq!(current_version(&conn), CURRENT_SCHEMA_VERSION);
}
#[test]
fn migrate_rollback_path_on_failed_arm_propagates_error() {
let conn = Connection::open_in_memory().expect("in-memory db");
conn.execute_batch(SCHEMA).unwrap();
conn.execute("DELETE FROM schema_version", []).unwrap();
conn.execute("INSERT INTO schema_version VALUES (28)", [])
.unwrap();
conn.execute("DROP TABLE schema_version", []).unwrap();
let res = super::migrate(&conn);
assert!(
res.is_err(),
"migrate must propagate err when terminal INSERT fails"
);
}
}