Skip to main content

memoir_core/store/
postgres.rs

1//! [`MemoryStore`] implementation backed by Postgres.
2
3use chrono::{DateTime, FixedOffset};
4use sea_orm::{ConnectionTrait, DatabaseConnection, Statement, Value as SeaOrmValue};
5
6use super::{AsOfParams, EditPatch, IndexStatus, MemoryStore, StoreError, TimelineDirection, TimelineParams};
7use crate::memory::{ExtractionStat, ForgetTarget, Memory, MemoryKind, Scope, StatsFilter, SupersessionEvent};
8
9const PID_LENGTH: usize = 21;
10
11/// Column list shared by every `Memory::try_from`-bound SELECT.
12///
13/// `supersession_at` is sourced from the `supersession_events` audit table
14/// via correlated subquery, gated on the cached `superseded_by` column so
15/// active rows return `NULL` even when a prior unsupersede event exists.
16/// The compound index `supersession_events_loser_decided_idx` makes the
17/// subquery an indexed lookup.
18const MEMORY_SELECT_COLUMNS: &str = "
19    m.pid,
20    m.agent_id,
21    m.org_id,
22    m.user_id,
23    m.content,
24    m.metadata,
25    m.kind,
26    m.qdrant_status,
27    m.source_pid,
28    m.superseded_by,
29    m.created_at,
30    m.updated_at,
31    m.event_at,
32    m.confidence,
33    m.category,
34    m.retirement_reason,
35    CASE
36        WHEN m.superseded_by IS NULL THEN NULL
37        ELSE (
38            SELECT MAX(decided_at)
39            FROM supersession_events
40            WHERE loser_pid = m.pid
41        )
42    END AS supersession_at
43";
44
45/// Default [`MemoryStore`] backed by Postgres.
46///
47/// Constructed via [`Self::new`] from an existing
48/// [`sea_orm::DatabaseConnection`]. The caller owns the connection's
49/// lifecycle; this store does not pool or reconnect.
50#[derive(Debug, Clone)]
51pub struct PostgresStore {
52    db: DatabaseConnection,
53}
54
55impl PostgresStore {
56    /// Builds a store from an existing Postgres connection.
57    pub fn new(db: DatabaseConnection) -> Self {
58        Self { db }
59    }
60
61    /// Returns the underlying Postgres connection.
62    pub fn db(&self) -> &DatabaseConnection {
63        &self.db
64    }
65}
66
67impl MemoryStore for PostgresStore {
68    async fn remember(&self, new: crate::store::NewMemory) -> Result<Memory, StoreError> {
69        let crate::store::NewMemory {
70            scope,
71            content,
72            metadata,
73            kind,
74            source_pid,
75            event_at,
76            confidence,
77        } = new;
78        scope.validate()?;
79
80        let pid = nanoid::nanoid!(PID_LENGTH);
81
82        let stmt = Statement::from_sql_and_values(
83            sea_orm::DatabaseBackend::Postgres,
84            r#"
85            INSERT INTO memories (pid, agent_id, org_id, user_id, content, metadata, kind, source_pid, event_at, confidence)
86            VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
87            RETURNING
88                pid, agent_id, org_id, user_id, content, metadata, kind,
89                qdrant_status, source_pid, superseded_by, created_at, updated_at, event_at,
90                confidence, category, retirement_reason,
91                NULL::TIMESTAMPTZ AS supersession_at
92            "#,
93            [
94                SeaOrmValue::String(Some(pid)),
95                SeaOrmValue::String(Some(scope.agent_id.clone())),
96                SeaOrmValue::String(Some(scope.org_id.clone())),
97                SeaOrmValue::String(Some(scope.user_id.clone())),
98                SeaOrmValue::String(Some(content)),
99                SeaOrmValue::Json(Some(Box::new(metadata))),
100                SeaOrmValue::String(Some(kind.to_string())),
101                SeaOrmValue::String(source_pid),
102                SeaOrmValue::ChronoDateTimeWithTimeZone(event_at),
103                // The column is SMALLINT; Confidence's invariant guarantees 0-100.
104                SeaOrmValue::SmallInt(Some(i16::from(confidence.get()))),
105            ],
106        );
107
108        let row = self
109            .db
110            .query_one_raw(stmt)
111            .await?
112            .ok_or_else(|| StoreError::CacheInvariant("insert returned no row".to_string()))?;
113
114        Memory::try_from(&row).map(|mut m| {
115            m.score = None;
116            m
117        })
118    }
119
120    async fn recall(&self, pid: &str) -> Result<Memory, StoreError> {
121        if pid.is_empty() {
122            return Err(StoreError::NotFound(pid.to_string()));
123        }
124
125        let select_sql = format!("SELECT {MEMORY_SELECT_COLUMNS} FROM memories m WHERE m.pid = $1");
126        let stmt = Statement::from_sql_and_values(
127            sea_orm::DatabaseBackend::Postgres,
128            select_sql,
129            [SeaOrmValue::String(Some(pid.to_string()))],
130        );
131
132        let row = self
133            .db
134            .query_one_raw(stmt)
135            .await?
136            .ok_or_else(|| StoreError::NotFound(pid.to_string()))?;
137
138        Memory::try_from(&row)
139    }
140
141    async fn find_by_pids(&self, pids: &[&str]) -> Result<Vec<Memory>, StoreError> {
142        if pids.is_empty() {
143            return Ok(Vec::new());
144        }
145
146        let owned_pids: Vec<String> = pids.iter().map(|p| (*p).to_string()).collect();
147        let select_sql = format!(
148            "SELECT {MEMORY_SELECT_COLUMNS} FROM memories m \
149             WHERE m.pid = ANY($1) AND m.qdrant_status = 'indexed' \
150               AND m.superseded_by IS NULL AND m.retirement_reason IS NULL"
151        );
152        let stmt = Statement::from_sql_and_values(
153            sea_orm::DatabaseBackend::Postgres,
154            select_sql,
155            [SeaOrmValue::Array(
156                sea_orm::sea_query::ArrayType::String,
157                Some(Box::new(
158                    owned_pids.into_iter().map(|p| SeaOrmValue::String(Some(p))).collect(),
159                )),
160            )],
161        );
162
163        let rows = self.db.query_all_raw(stmt).await?;
164        let mut memories = Vec::with_capacity(rows.len());
165        for row in &rows {
166            memories.push(Memory::try_from(row)?);
167        }
168        Ok(memories)
169    }
170
171    async fn active_semantics_for_source(&self, source_pid: &str) -> Result<Vec<Memory>, StoreError> {
172        if source_pid.is_empty() {
173            return Ok(Vec::new());
174        }
175
176        let select_sql = format!(
177            "SELECT {MEMORY_SELECT_COLUMNS} FROM memories m \
178             WHERE m.source_pid = $1 AND m.kind = 'semantic' \
179               AND m.superseded_by IS NULL AND m.retirement_reason IS NULL"
180        );
181        let stmt = Statement::from_sql_and_values(
182            sea_orm::DatabaseBackend::Postgres,
183            select_sql,
184            [SeaOrmValue::String(Some(source_pid.to_string()))],
185        );
186
187        let rows = self.db.query_all_raw(stmt).await?;
188        let mut memories = Vec::with_capacity(rows.len());
189        for row in &rows {
190            memories.push(Memory::try_from(row)?);
191        }
192        Ok(memories)
193    }
194
195    async fn extraction_stats(&self, filter: StatsFilter) -> Result<Vec<ExtractionStat>, StoreError> {
196        // Always-present constraint: only semantic rows are extractions. Optional
197        // scope-subset filters AND onto it with positional params. provider/model
198        // live in the metadata blob (epic 0006 left them there); group on the
199        // extracted JSON text. `rejected` is a FILTERed count so total and
200        // rejected come back in one pass — total includes Stale + superseded rows
201        // (they are not model errors, so they are not in the FILTER).
202        let mut where_clauses: Vec<String> = vec!["m.kind = 'semantic'".into()];
203        let mut values: Vec<SeaOrmValue> = Vec::new();
204
205        for (column, value) in [
206            ("agent_id", filter.agent_id),
207            ("org_id", filter.org_id),
208            ("user_id", filter.user_id),
209        ] {
210            if let Some(value) = value {
211                values.push(SeaOrmValue::String(Some(value)));
212                where_clauses.push(format!("m.{column} = ${}", values.len()));
213            }
214        }
215
216        let sql = format!(
217            "SELECT \
218               COALESCE(m.metadata ->> 'provider', '') AS provider, \
219               COALESCE(m.metadata ->> 'model', '') AS model, \
220               COUNT(*)::BIGINT AS total, \
221               COUNT(*) FILTER (WHERE m.retirement_reason = 'rejected')::BIGINT AS rejected \
222             FROM memories m \
223             WHERE {} \
224             GROUP BY provider, model \
225             ORDER BY provider ASC, model ASC",
226            where_clauses.join(" AND "),
227        );
228
229        let stmt = Statement::from_sql_and_values(sea_orm::DatabaseBackend::Postgres, sql, values);
230        let rows = self.db.query_all_raw(stmt).await?;
231
232        let mut stats = Vec::with_capacity(rows.len());
233        for row in &rows {
234            stats.push(ExtractionStat {
235                provider: row.try_get::<String>("", "provider")?,
236                model: row.try_get::<String>("", "model")?,
237                total: u64::try_from(row.try_get::<i64>("", "total")?).unwrap_or(0),
238                rejected: u64::try_from(row.try_get::<i64>("", "rejected")?).unwrap_or(0),
239            });
240        }
241        Ok(stats)
242    }
243
244    async fn timeline(&self, scope: Scope, params: TimelineParams) -> Result<Vec<Memory>, StoreError> {
245        scope.validate()?;
246
247        let mut where_clauses: Vec<String> = vec![
248            "m.agent_id = $1".into(),
249            "m.org_id = $2".into(),
250            "m.user_id = $3".into(),
251        ];
252        let mut values: Vec<SeaOrmValue> = vec![
253            SeaOrmValue::String(Some(scope.agent_id)),
254            SeaOrmValue::String(Some(scope.org_id)),
255            SeaOrmValue::String(Some(scope.user_id)),
256        ];
257
258        let included = params.kinds.included_kinds();
259        if included.is_empty() {
260            return Ok(Vec::new());
261        }
262        if !params.kinds.includes_all() {
263            let placeholders: Vec<String> = included
264                .iter()
265                .map(|kind| {
266                    values.push(SeaOrmValue::String(Some(kind.to_string())));
267                    format!("${}", values.len())
268                })
269                .collect();
270            where_clauses.push(format!("m.kind IN ({})", placeholders.join(", ")));
271        }
272
273        if let Some(t) = params.created_after {
274            values.push(SeaOrmValue::ChronoDateTimeWithTimeZone(Some(t)));
275            where_clauses.push(format!("m.created_at >= ${}", values.len()));
276        }
277        if let Some(t) = params.created_before {
278            values.push(SeaOrmValue::ChronoDateTimeWithTimeZone(Some(t)));
279            where_clauses.push(format!("m.created_at < ${}", values.len()));
280        }
281        if let Some(t) = params.event_at_after {
282            values.push(SeaOrmValue::ChronoDateTimeWithTimeZone(Some(t)));
283            where_clauses.push(format!("m.event_at >= ${}", values.len()));
284        }
285        if let Some(t) = params.event_at_before {
286            values.push(SeaOrmValue::ChronoDateTimeWithTimeZone(Some(t)));
287            where_clauses.push(format!("m.event_at < ${}", values.len()));
288        }
289        if !params.include_superseded {
290            where_clauses.push("m.superseded_by IS NULL".into());
291        }
292        // Retired rows (rejected/stale) are scrubbed from all reads,
293        // unconditionally — unlike supersession, retirement has no
294        // "include" escape hatch (a rejected extraction was never true).
295        where_clauses.push("m.retirement_reason IS NULL".into());
296
297        let order = match params.direction {
298            TimelineDirection::Descending => "DESC",
299            TimelineDirection::Ascending => "ASC",
300        };
301
302        values.push(SeaOrmValue::BigInt(Some(params.limit as i64)));
303        let limit_placeholder = values.len();
304
305        let sql = format!(
306            "SELECT {MEMORY_SELECT_COLUMNS} FROM memories m \
307             WHERE {where_sql} \
308             ORDER BY m.created_at {order} \
309             LIMIT ${limit_placeholder}",
310            where_sql = where_clauses.join(" AND "),
311        );
312        let stmt = Statement::from_sql_and_values(sea_orm::DatabaseBackend::Postgres, sql, values);
313
314        let rows = self.db.query_all_raw(stmt).await?;
315        let mut memories = Vec::with_capacity(rows.len());
316        for row in &rows {
317            memories.push(Memory::try_from(row)?);
318        }
319        Ok(memories)
320    }
321
322    async fn memories_as_of(&self, scope: Scope, params: AsOfParams) -> Result<Vec<Memory>, StoreError> {
323        scope.validate()?;
324
325        let included = params.kinds.included_kinds();
326        if included.is_empty() {
327            return Ok(Vec::new());
328        }
329
330        let mut where_clauses: Vec<String> = vec![
331            "m.agent_id = $1".into(),
332            "m.org_id = $2".into(),
333            "m.user_id = $3".into(),
334            "m.created_at <= $4".into(),
335            "latest_event.winner_pid IS NULL".into(),
336            // Retirement is current-state (no decided_at history), so it is
337            // applied uniformly even to this point-in-time read: a
338            // rejected/stale row is scrubbed from every view (epic 0011).
339            "m.retirement_reason IS NULL".into(),
340        ];
341        let mut values: Vec<SeaOrmValue> = vec![
342            SeaOrmValue::String(Some(scope.agent_id)),
343            SeaOrmValue::String(Some(scope.org_id)),
344            SeaOrmValue::String(Some(scope.user_id)),
345            SeaOrmValue::ChronoDateTimeWithTimeZone(Some(params.as_of)),
346        ];
347
348        if !params.kinds.includes_all() {
349            let placeholders: Vec<String> = included
350                .iter()
351                .map(|kind| {
352                    values.push(SeaOrmValue::String(Some(kind.to_string())));
353                    format!("${}", values.len())
354                })
355                .collect();
356            where_clauses.push(format!("m.kind IN ({})", placeholders.join(", ")));
357        }
358
359        values.push(SeaOrmValue::BigInt(Some(params.limit as i64)));
360        let limit_placeholder = values.len();
361
362        let sql = format!(
363            "SELECT {MEMORY_SELECT_COLUMNS} \
364             FROM memories m \
365             LEFT JOIN LATERAL ( \
366                 SELECT loser_pid, winner_pid, decided_at \
367                 FROM supersession_events \
368                 WHERE loser_pid = m.pid AND decided_at <= $4 \
369                 ORDER BY decided_at DESC \
370                 LIMIT 1 \
371             ) AS latest_event ON TRUE \
372             WHERE {where_sql} \
373             ORDER BY m.created_at DESC \
374             LIMIT ${limit_placeholder}",
375            where_sql = where_clauses.join(" AND "),
376        );
377        let stmt = Statement::from_sql_and_values(sea_orm::DatabaseBackend::Postgres, sql, values);
378
379        let rows = self.db.query_all_raw(stmt).await?;
380        let mut memories = Vec::with_capacity(rows.len());
381        for row in &rows {
382            memories.push(Memory::try_from(row)?);
383        }
384        Ok(memories)
385    }
386
387    async fn forget(&self, target: ForgetTarget) -> Result<Vec<String>, StoreError> {
388        match target {
389            ForgetTarget::Pid(pid) => self.forget_pid(&pid).await,
390            ForgetTarget::Scope(scope) => self.forget_scope(scope).await,
391        }
392    }
393
394    async fn set_index_status(&self, pid: &str, status: IndexStatus) -> Result<(), StoreError> {
395        let stmt = Statement::from_sql_and_values(
396            sea_orm::DatabaseBackend::Postgres,
397            "UPDATE memories SET qdrant_status = $1 WHERE pid = $2",
398            [
399                SeaOrmValue::String(Some(status.to_string())),
400                SeaOrmValue::String(Some(pid.to_string())),
401            ],
402        );
403
404        let result = self.db.execute_raw(stmt).await?;
405
406        if result.rows_affected() == 0 {
407            return Err(StoreError::NotFound(pid.to_string()));
408        }
409        Ok(())
410    }
411
412    async fn find_failed(&self, limit: usize) -> Result<Vec<Memory>, StoreError> {
413        let select_sql =
414            format!("SELECT {MEMORY_SELECT_COLUMNS} FROM memories m WHERE m.qdrant_status = 'failed' LIMIT $1");
415        let stmt = Statement::from_sql_and_values(
416            sea_orm::DatabaseBackend::Postgres,
417            select_sql,
418            [SeaOrmValue::BigInt(Some(limit as i64))],
419        );
420
421        let rows = self.db.query_all_raw(stmt).await?;
422        let mut memories = Vec::with_capacity(rows.len());
423        for row in &rows {
424            memories.push(Memory::try_from(row)?);
425        }
426        Ok(memories)
427    }
428
429    async fn list_scopes(&self) -> Result<Vec<Scope>, StoreError> {
430        let stmt = Statement::from_string(
431            sea_orm::DatabaseBackend::Postgres,
432            "SELECT DISTINCT agent_id, org_id, user_id FROM memories".to_string(),
433        );
434        let rows = self.db.query_all_raw(stmt).await?;
435
436        let mut scopes = Vec::with_capacity(rows.len());
437        for row in &rows {
438            scopes.push(Scope {
439                agent_id: row.try_get::<String>("", "agent_id")?,
440                org_id: row.try_get::<String>("", "org_id")?,
441                user_id: row.try_get::<String>("", "user_id")?,
442            });
443        }
444        Ok(scopes)
445    }
446
447    async fn list_agent_ids(&self, org_id: &str, user_id: &str) -> Result<Vec<String>, StoreError> {
448        let stmt = Statement::from_sql_and_values(
449            sea_orm::DatabaseBackend::Postgres,
450            r#"
451            SELECT DISTINCT agent_id FROM memories
452            WHERE org_id = $1 AND user_id = $2
453            ORDER BY agent_id ASC
454            "#,
455            [
456                SeaOrmValue::String(Some(org_id.to_owned())),
457                SeaOrmValue::String(Some(user_id.to_owned())),
458            ],
459        );
460
461        let rows = self.db.query_all_raw(stmt).await?;
462        let mut agent_ids = Vec::with_capacity(rows.len());
463        for row in &rows {
464            agent_ids.push(row.try_get::<String>("", "agent_id")?);
465        }
466        Ok(agent_ids)
467    }
468
469    async fn indexed_pids_in_scope(&self, scope: &Scope) -> Result<Vec<String>, StoreError> {
470        scope.validate()?;
471
472        let stmt = Statement::from_sql_and_values(
473            sea_orm::DatabaseBackend::Postgres,
474            r#"
475            SELECT pid FROM memories
476            WHERE agent_id = $1 AND org_id = $2 AND user_id = $3
477              AND qdrant_status = 'indexed'
478            "#,
479            [
480                SeaOrmValue::String(Some(scope.agent_id.clone())),
481                SeaOrmValue::String(Some(scope.org_id.clone())),
482                SeaOrmValue::String(Some(scope.user_id.clone())),
483            ],
484        );
485
486        let rows = self.db.query_all_raw(stmt).await?;
487        let mut pids = Vec::with_capacity(rows.len());
488        for row in &rows {
489            pids.push(row.try_get::<String>("", "pid")?);
490        }
491        Ok(pids)
492    }
493
494    async fn edit(&self, pid: &str, patch: EditPatch) -> Result<Memory, StoreError> {
495        if patch.is_empty() {
496            return self.recall(pid).await;
497        }
498
499        let current = self.recall(pid).await?;
500        if current.kind != MemoryKind::Episodic {
501            return Err(StoreError::UnsupportedEdit {
502                pid: pid.to_string(),
503                kind: current.kind,
504            });
505        }
506
507        let mut set_fragments: Vec<String> = Vec::with_capacity(3);
508        let mut values: Vec<SeaOrmValue> = Vec::with_capacity(4);
509
510        if let Some(content) = patch.content {
511            set_fragments.push(format!("content = ${}", values.len() + 1));
512            values.push(SeaOrmValue::String(Some(content)));
513        }
514        if let Some(metadata) = patch.metadata {
515            set_fragments.push(format!("metadata = ${}", values.len() + 1));
516            values.push(SeaOrmValue::Json(Some(Box::new(metadata))));
517        }
518        if let Some(event_at) = patch.event_at {
519            set_fragments.push(format!("event_at = ${}", values.len() + 1));
520            values.push(SeaOrmValue::ChronoDateTimeWithTimeZone(event_at));
521        }
522
523        let pid_placeholder = values.len() + 1;
524        values.push(SeaOrmValue::String(Some(pid.to_string())));
525
526        let sql = format!(
527            "UPDATE memories SET {set} WHERE pid = ${pid_placeholder}",
528            set = set_fragments.join(", "),
529        );
530        let stmt = Statement::from_sql_and_values(sea_orm::DatabaseBackend::Postgres, sql, values);
531
532        let result = self.db.execute_raw(stmt).await?;
533        if result.rows_affected() == 0 {
534            return Err(StoreError::NotFound(pid.to_string()));
535        }
536
537        self.recall(pid).await
538    }
539
540    async fn set_category(&self, pid: &str, category: &str) -> Result<(), StoreError> {
541        let stmt = Statement::from_sql_and_values(
542            sea_orm::DatabaseBackend::Postgres,
543            "UPDATE memories SET category = $1 WHERE pid = $2",
544            [
545                SeaOrmValue::String(Some(category.to_string())),
546                SeaOrmValue::String(Some(pid.to_string())),
547            ],
548        );
549        let result = self.db.execute_raw(stmt).await?;
550        if result.rows_affected() == 0 {
551            return Err(StoreError::NotFound(pid.to_string()));
552        }
553        Ok(())
554    }
555
556    async fn retire(&self, pid: &str, reason: crate::memory::RetirementReason) -> Result<(), StoreError> {
557        let stmt = Statement::from_sql_and_values(
558            sea_orm::DatabaseBackend::Postgres,
559            "UPDATE memories SET retirement_reason = $1 WHERE pid = $2",
560            [
561                SeaOrmValue::String(Some(reason.to_string())),
562                SeaOrmValue::String(Some(pid.to_string())),
563            ],
564        );
565        let result = self.db.execute_raw(stmt).await?;
566        if result.rows_affected() == 0 {
567            return Err(StoreError::NotFound(pid.to_string()));
568        }
569        Ok(())
570    }
571
572    async fn supersede(&self, pid: &str, by_pid: &str) -> Result<(), StoreError> {
573        // `INSERT ... SELECT ... WHERE EXISTS` keeps the contract identical
574        // to the old UPDATE-based path: if the loser pid does not exist,
575        // zero rows are inserted and we surface `NotFound`. The trigger
576        // (migration 0005) maintains `memories.superseded_by` from the
577        // inserted event. FK violations on `winner_pid` still bubble up as
578        // `Database` errors when `by_pid` doesn't exist.
579        let stmt = Statement::from_sql_and_values(
580            sea_orm::DatabaseBackend::Postgres,
581            r#"
582            INSERT INTO supersession_events (loser_pid, winner_pid)
583            SELECT $1, $2
584            WHERE EXISTS (SELECT 1 FROM memories WHERE pid = $1)
585            "#,
586            [
587                SeaOrmValue::String(Some(pid.to_string())),
588                SeaOrmValue::String(Some(by_pid.to_string())),
589            ],
590        );
591
592        let result = self.db.execute_raw(stmt).await?;
593
594        if result.rows_affected() == 0 {
595            return Err(StoreError::NotFound(pid.to_string()));
596        }
597        Ok(())
598    }
599
600    async fn unsupersede(&self, pid: &str) -> Result<(), StoreError> {
601        // Same EXISTS-guarded INSERT shape as `supersede`; `winner_pid` is
602        // NULL to encode an unsupersede event. Per DP2, this always inserts
603        // (no cache pre-check) — the audit table reflects every operator
604        // call, even redundant ones against an already-active row.
605        let stmt = Statement::from_sql_and_values(
606            sea_orm::DatabaseBackend::Postgres,
607            r#"
608            INSERT INTO supersession_events (loser_pid, winner_pid)
609            SELECT $1, NULL
610            WHERE EXISTS (SELECT 1 FROM memories WHERE pid = $1)
611            "#,
612            [SeaOrmValue::String(Some(pid.to_string()))],
613        );
614
615        let result = self.db.execute_raw(stmt).await?;
616
617        if result.rows_affected() == 0 {
618            return Err(StoreError::NotFound(pid.to_string()));
619        }
620        Ok(())
621    }
622
623    async fn supersession_at(&self, pid: &str, as_of: DateTime<FixedOffset>) -> Result<Option<String>, StoreError> {
624        // Returns the winner_pid for `pid` as of timestamp `as_of`, or
625        // `None` if the row was not superseded at that time (either it
626        // had no events, or its latest event before `as_of` was an
627        // unsupersede). The compound index
628        // `supersession_events_loser_decided_idx` makes this an indexed
629        // ORDER BY + LIMIT.
630        let stmt = Statement::from_sql_and_values(
631            sea_orm::DatabaseBackend::Postgres,
632            r#"
633            SELECT winner_pid
634            FROM supersession_events
635            WHERE loser_pid = $1 AND decided_at <= $2
636            ORDER BY decided_at DESC
637            LIMIT 1
638            "#,
639            [
640                SeaOrmValue::String(Some(pid.to_string())),
641                SeaOrmValue::ChronoDateTimeWithTimeZone(Some(as_of)),
642            ],
643        );
644
645        let row = self.db.query_one_raw(stmt).await?;
646        match row {
647            None => Ok(None),
648            Some(row) => row.try_get("", "winner_pid").map_err(StoreError::from),
649        }
650    }
651
652    async fn supersession_history(&self, pid: &str) -> Result<Vec<SupersessionEvent>, StoreError> {
653        // The compound index `supersession_events_loser_decided_idx` makes
654        // this an indexed forward scan. Per-pid trails are tiny (a handful
655        // of events) so no LIMIT.
656        let stmt = Statement::from_sql_and_values(
657            sea_orm::DatabaseBackend::Postgres,
658            r#"
659            SELECT winner_pid, decided_at
660            FROM supersession_events
661            WHERE loser_pid = $1
662            ORDER BY decided_at ASC
663            "#,
664            [SeaOrmValue::String(Some(pid.to_string()))],
665        );
666
667        let rows = self.db.query_all_raw(stmt).await?;
668        let mut trail = Vec::with_capacity(rows.len());
669        for row in &rows {
670            trail.push(SupersessionEvent {
671                winner_pid: row.try_get("", "winner_pid")?,
672                decided_at: row.try_get("", "decided_at")?,
673            });
674        }
675        Ok(trail)
676    }
677}
678
679impl PostgresStore {
680    async fn forget_pid(&self, pid: &str) -> Result<Vec<String>, StoreError> {
681        // Delete the derived semantic rows (`source_pid = $1`) and the named row
682        // in one statement, returning every removed pid. The `source_pid` FK is
683        // `ON DELETE CASCADE`, but a plain `DELETE ... WHERE pid = $1 RETURNING`
684        // sees only the named pid — the cascade-removed children never reach
685        // RETURNING, so their vectors would orphan in Qdrant. Deleting the
686        // children explicitly in a CTE puts them in the result set. Depth is
687        // always 1: only semantic rows carry `source_pid`, and they are never
688        // themselves a source (migration 000002), so no recursion is needed.
689        let stmt = Statement::from_sql_and_values(
690            sea_orm::DatabaseBackend::Postgres,
691            r#"
692            WITH derived AS (
693                DELETE FROM memories WHERE source_pid = $1 RETURNING pid
694            ), root AS (
695                DELETE FROM memories WHERE pid = $1 RETURNING pid
696            )
697            SELECT pid FROM derived
698            UNION ALL
699            SELECT pid FROM root
700            "#,
701            [SeaOrmValue::String(Some(pid.to_string()))],
702        );
703        let rows = self.db.query_all_raw(stmt).await?;
704        let mut deleted = Vec::with_capacity(rows.len());
705        for row in &rows {
706            deleted.push(row.try_get::<String>("", "pid")?);
707        }
708        Ok(deleted)
709    }
710
711    async fn forget_scope(&self, scope: Scope) -> Result<Vec<String>, StoreError> {
712        scope.validate()?;
713
714        let stmt = Statement::from_sql_and_values(
715            sea_orm::DatabaseBackend::Postgres,
716            "DELETE FROM memories WHERE agent_id = $1 AND org_id = $2 AND user_id = $3 RETURNING pid",
717            [
718                SeaOrmValue::String(Some(scope.agent_id)),
719                SeaOrmValue::String(Some(scope.org_id)),
720                SeaOrmValue::String(Some(scope.user_id)),
721            ],
722        );
723        let rows = self.db.query_all_raw(stmt).await?;
724        let mut deleted = Vec::with_capacity(rows.len());
725        for row in &rows {
726            deleted.push(row.try_get::<String>("", "pid")?);
727        }
728        Ok(deleted)
729    }
730}
731
732impl TryFrom<&sea_orm::QueryResult> for Memory {
733    type Error = StoreError;
734
735    fn try_from(row: &sea_orm::QueryResult) -> Result<Self, Self::Error> {
736        let pid: String = row.try_get("", "pid")?;
737        let agent_id: String = row.try_get("", "agent_id")?;
738        let org_id: String = row.try_get("", "org_id")?;
739        let user_id: String = row.try_get("", "user_id")?;
740        let content: String = row.try_get("", "content")?;
741        let metadata: serde_json::Value = row.try_get("", "metadata")?;
742        let kind_str: String = row.try_get("", "kind")?;
743        let status_str: String = row.try_get("", "qdrant_status")?;
744        let source_pid: Option<String> = row.try_get("", "source_pid")?;
745        let superseded_by: Option<String> = row.try_get("", "superseded_by")?;
746        let created_at: DateTime<FixedOffset> = row.try_get("", "created_at")?;
747        let updated_at: DateTime<FixedOffset> = row.try_get("", "updated_at")?;
748        let event_at: Option<DateTime<FixedOffset>> = row.try_get("", "event_at")?;
749        let confidence_raw: i16 = row.try_get("", "confidence")?;
750        let category: Option<String> = row.try_get("", "category")?;
751        let retirement_str: Option<String> = row.try_get("", "retirement_reason")?;
752        let supersession_at: Option<DateTime<FixedOffset>> = row.try_get("", "supersession_at")?;
753
754        let kind: MemoryKind = kind_str
755            .parse()
756            .map_err(|_| StoreError::CacheInvariant(format!("unknown memory kind: {kind_str}")))?;
757
758        let status: IndexStatus = status_str
759            .parse()
760            .map_err(|_| StoreError::CacheInvariant(format!("unknown qdrant status: {status_str}")))?;
761
762        let retirement = retirement_str
763            .map(|s| {
764                s.parse::<crate::memory::RetirementReason>()
765                    .map_err(|_| StoreError::CacheInvariant(format!("unknown retirement reason: {s}")))
766            })
767            .transpose()?;
768
769        // The `memories.confidence` CHECK constrains the column to 0-100, so an
770        // `i16` from the DB always fits `i8`. `Confidence::new` clamps as
771        // defense-in-depth against a corrupted row rather than erroring.
772        let confidence = crate::memory::Confidence::new(confidence_raw.clamp(0, 100) as i8);
773
774        let supersession = match (superseded_by, supersession_at) {
775            (Some(winner_pid), Some(at)) => Some(crate::memory::SupersessionInfo { winner_pid, at }),
776            (None, None) => None,
777            (Some(winner_pid), None) => {
778                return Err(StoreError::CacheInvariant(format!(
779                    "row {pid}: superseded_by={winner_pid} but no supersession_events row found"
780                )));
781            }
782            (None, Some(_)) => {
783                return Err(StoreError::CacheInvariant(format!(
784                    "row {pid}: supersession_at populated but superseded_by is NULL"
785                )));
786            }
787        };
788
789        Ok(Memory {
790            pid,
791            scope: Scope {
792                agent_id,
793                org_id,
794                user_id,
795            },
796            content,
797            metadata,
798            kind,
799            source_pid,
800            supersession,
801            created_at,
802            updated_at,
803            event_at,
804            score: None,
805            status,
806            confidence,
807            category,
808            retirement,
809        })
810    }
811}