Skip to main content

mnemo_postgres/
migrations.rs

1use mnemo_core::error::{Error, Result};
2
3/// Run all PostgreSQL schema migrations.
4///
5/// The `dimensions` parameter controls the width of the pgvector `vector` column
6/// on the `memories` table (e.g. 1536 for OpenAI ada-002 embeddings).
7pub async fn run_migrations(pool: &sqlx::PgPool, dimensions: usize) -> Result<()> {
8    // Enable pgvector extension
9    sqlx::query("CREATE EXTENSION IF NOT EXISTS vector")
10        .execute(pool)
11        .await
12        .map_err(|e| Error::Storage(format!("failed to create vector extension: {e}")))?;
13
14    // 1. memories
15    let create_memories = format!(
16        r#"
17CREATE TABLE IF NOT EXISTS memories (
18    id UUID PRIMARY KEY,
19    agent_id VARCHAR NOT NULL,
20    content TEXT NOT NULL,
21    memory_type VARCHAR NOT NULL,
22    scope VARCHAR NOT NULL DEFAULT 'private',
23    importance REAL NOT NULL DEFAULT 0.5,
24    tags TEXT[],
25    metadata JSONB,
26    embedding vector({dimensions}),
27    content_hash BYTEA NOT NULL,
28    prev_hash BYTEA,
29    source_type VARCHAR NOT NULL DEFAULT 'agent',
30    source_id VARCHAR,
31    consolidation_state VARCHAR NOT NULL DEFAULT 'raw',
32    access_count BIGINT NOT NULL DEFAULT 0,
33    org_id VARCHAR,
34    thread_id VARCHAR,
35    created_at VARCHAR NOT NULL,
36    updated_at VARCHAR NOT NULL,
37    last_accessed_at VARCHAR,
38    expires_at VARCHAR,
39    deleted_at VARCHAR,
40    decay_rate REAL,
41    created_by VARCHAR,
42    version INTEGER NOT NULL DEFAULT 1,
43    prev_version_id UUID,
44    quarantined BOOLEAN NOT NULL DEFAULT FALSE,
45    quarantine_reason VARCHAR,
46    decay_function VARCHAR
47)
48"#
49    );
50    sqlx::query(&create_memories)
51        .execute(pool)
52        .await
53        .map_err(|e| Error::Storage(format!("create memories: {e}")))?;
54
55    // 2. acls
56    sqlx::query(
57        r#"
58CREATE TABLE IF NOT EXISTS acls (
59    id UUID PRIMARY KEY,
60    memory_id UUID NOT NULL,
61    principal_type VARCHAR NOT NULL,
62    principal_id VARCHAR NOT NULL,
63    permission VARCHAR NOT NULL,
64    granted_by VARCHAR NOT NULL,
65    created_at VARCHAR NOT NULL,
66    expires_at VARCHAR
67)
68"#,
69    )
70    .execute(pool)
71    .await
72    .map_err(|e| Error::Storage(format!("create acls: {e}")))?;
73
74    // 3. relations
75    sqlx::query(
76        r#"
77CREATE TABLE IF NOT EXISTS relations (
78    id UUID PRIMARY KEY,
79    source_id UUID NOT NULL,
80    target_id UUID NOT NULL,
81    relation_type VARCHAR NOT NULL,
82    weight REAL NOT NULL DEFAULT 1.0,
83    metadata JSONB,
84    created_at VARCHAR NOT NULL
85)
86"#,
87    )
88    .execute(pool)
89    .await
90    .map_err(|e| Error::Storage(format!("create relations: {e}")))?;
91
92    // 4. agent_events
93    sqlx::query(
94        r#"
95CREATE TABLE IF NOT EXISTS agent_events (
96    id UUID PRIMARY KEY,
97    agent_id VARCHAR NOT NULL,
98    thread_id VARCHAR,
99    run_id VARCHAR,
100    parent_event_id UUID,
101    event_type VARCHAR NOT NULL,
102    payload JSONB,
103    trace_id VARCHAR,
104    span_id VARCHAR,
105    model VARCHAR,
106    tokens_input BIGINT,
107    tokens_output BIGINT,
108    latency_ms BIGINT,
109    cost_usd DOUBLE PRECISION,
110    "timestamp" VARCHAR NOT NULL,
111    logical_clock BIGINT NOT NULL DEFAULT 0,
112    content_hash BYTEA NOT NULL,
113    prev_hash BYTEA,
114    embedding BYTEA
115)
116"#,
117    )
118    .execute(pool)
119    .await
120    .map_err(|e| Error::Storage(format!("create agent_events: {e}")))?;
121
122    // 5. checkpoints
123    sqlx::query(
124        r#"
125CREATE TABLE IF NOT EXISTS checkpoints (
126    id UUID PRIMARY KEY,
127    thread_id VARCHAR NOT NULL,
128    agent_id VARCHAR NOT NULL,
129    parent_id UUID,
130    branch_name VARCHAR NOT NULL DEFAULT 'main',
131    state_snapshot JSONB,
132    state_diff JSONB,
133    memory_refs TEXT[],
134    event_cursor UUID,
135    label VARCHAR,
136    created_at VARCHAR NOT NULL,
137    metadata JSONB
138)
139"#,
140    )
141    .execute(pool)
142    .await
143    .map_err(|e| Error::Storage(format!("create checkpoints: {e}")))?;
144
145    // 6. delegations
146    sqlx::query(
147        r#"
148CREATE TABLE IF NOT EXISTS delegations (
149    id UUID PRIMARY KEY,
150    delegator_id VARCHAR NOT NULL,
151    delegate_id VARCHAR NOT NULL,
152    permission VARCHAR NOT NULL,
153    scope_type VARCHAR NOT NULL DEFAULT 'all_memories',
154    scope_value JSONB,
155    max_depth INTEGER NOT NULL DEFAULT 0,
156    current_depth INTEGER NOT NULL DEFAULT 0,
157    parent_delegation_id UUID,
158    created_at VARCHAR NOT NULL,
159    expires_at VARCHAR,
160    revoked_at VARCHAR
161)
162"#,
163    )
164    .execute(pool)
165    .await
166    .map_err(|e| Error::Storage(format!("create delegations: {e}")))?;
167
168    // 7. agent_profiles
169    sqlx::query(
170        r#"
171CREATE TABLE IF NOT EXISTS agent_profiles (
172    agent_id VARCHAR PRIMARY KEY,
173    avg_importance DOUBLE PRECISION NOT NULL DEFAULT 0.5,
174    avg_content_length DOUBLE PRECISION NOT NULL DEFAULT 100,
175    total_memories BIGINT NOT NULL DEFAULT 0,
176    last_updated VARCHAR NOT NULL
177)
178"#,
179    )
180    .execute(pool)
181    .await
182    .map_err(|e| Error::Storage(format!("create agent_profiles: {e}")))?;
183
184    // 8. sync_metadata
185    sqlx::query(
186        r#"
187CREATE TABLE IF NOT EXISTS sync_metadata (
188    key VARCHAR PRIMARY KEY,
189    value TEXT NOT NULL,
190    updated_at VARCHAR NOT NULL
191)
192"#,
193    )
194    .execute(pool)
195    .await
196    .map_err(|e| Error::Storage(format!("create sync_metadata: {e}")))?;
197
198    // 9. embedding_baseline (v0.3.3 — z-score outlier detector)
199    sqlx::query(
200        r#"
201CREATE TABLE IF NOT EXISTS embedding_baseline (
202    agent_id VARCHAR PRIMARY KEY,
203    mu JSONB NOT NULL,
204    cov_diag JSONB NOT NULL,
205    n BIGINT NOT NULL,
206    updated_at VARCHAR NOT NULL
207)
208"#,
209    )
210    .execute(pool)
211    .await
212    .map_err(|e| Error::Storage(format!("create embedding_baseline: {e}")))?;
213
214    // ---- Indexes ----
215    let index_stmts: &[&str] = &[
216        "CREATE INDEX IF NOT EXISTS idx_memories_agent ON memories(agent_id)",
217        "CREATE INDEX IF NOT EXISTS idx_memories_thread ON memories(agent_id, thread_id)",
218        "CREATE INDEX IF NOT EXISTS idx_acls_memory ON acls(memory_id)",
219        "CREATE INDEX IF NOT EXISTS idx_acls_principal ON acls(principal_id)",
220        "CREATE INDEX IF NOT EXISTS idx_relations_source ON relations(source_id)",
221        "CREATE INDEX IF NOT EXISTS idx_relations_target ON relations(target_id)",
222        "CREATE INDEX IF NOT EXISTS idx_events_agent ON agent_events(agent_id)",
223        "CREATE INDEX IF NOT EXISTS idx_events_thread ON agent_events(thread_id)",
224        "CREATE INDEX IF NOT EXISTS idx_events_parent ON agent_events(parent_event_id)",
225        "CREATE INDEX IF NOT EXISTS idx_checkpoints_thread ON checkpoints(thread_id, branch_name)",
226        "CREATE INDEX IF NOT EXISTS idx_delegations_delegator ON delegations(delegator_id)",
227        "CREATE INDEX IF NOT EXISTS idx_delegations_delegate ON delegations(delegate_id)",
228    ];
229
230    for stmt in index_stmts {
231        sqlx::query(stmt)
232            .execute(pool)
233            .await
234            .map_err(|e| Error::Storage(format!("create index: {e}")))?;
235    }
236
237    // Append-only enforcement on agent_events: prevent UPDATE/DELETE at schema level
238    sqlx::query(
239        r#"
240CREATE OR REPLACE FUNCTION prevent_event_modification() RETURNS trigger AS $$
241BEGIN
242    RAISE EXCEPTION 'agent_events is append-only: % not allowed', TG_OP;
243    RETURN NULL;
244END;
245$$ LANGUAGE plpgsql
246"#,
247    )
248    .execute(pool)
249    .await
250    .map_err(|e| Error::Storage(format!("create append-only function: {e}")))?;
251
252    sqlx::query(
253        r#"
254DO $$ BEGIN
255    IF NOT EXISTS (SELECT 1 FROM pg_trigger WHERE tgname = 'enforce_append_only_events') THEN
256        CREATE TRIGGER enforce_append_only_events
257            BEFORE UPDATE OR DELETE ON agent_events
258            FOR EACH ROW EXECUTE FUNCTION prevent_event_modification();
259    END IF;
260END $$
261"#,
262    )
263    .execute(pool)
264    .await
265    .map_err(|e| Error::Storage(format!("create append-only trigger: {e}")))?;
266
267    // HNSW vector index for cosine similarity
268    // Use DO block to make this idempotent (pgvector HNSW index creation
269    // does not support IF NOT EXISTS in all versions).
270    let hnsw_sql = r#"
271DO $$
272BEGIN
273    IF NOT EXISTS (
274        SELECT 1 FROM pg_indexes WHERE indexname = 'idx_memories_embedding_hnsw'
275    ) THEN
276        CREATE INDEX idx_memories_embedding_hnsw ON memories USING hnsw (embedding vector_cosine_ops);
277    END IF;
278END
279$$
280"#;
281    sqlx::query(hnsw_sql)
282        .execute(pool)
283        .await
284        .map_err(|e| Error::Storage(format!("create hnsw index: {e}")))?;
285
286    Ok(())
287}