1use chrono::{DateTime, FixedOffset};
4use sea_orm::{ConnectionTrait, DatabaseConnection, Statement, Value as SeaOrmValue};
5
6use super::{AsOfParams, EditPatch, IndexStatus, MemoryStore, StoreError, TimelineDirection, TimelineParams};
7use crate::memory::{ExtractionStat, ForgetTarget, Memory, MemoryKind, Scope, StatsFilter, SupersessionEvent};
8
9const PID_LENGTH: usize = 21;
10
11const MEMORY_SELECT_COLUMNS: &str = "
19 m.pid,
20 m.agent_id,
21 m.org_id,
22 m.user_id,
23 m.content,
24 m.metadata,
25 m.kind,
26 m.qdrant_status,
27 m.source_pid,
28 m.superseded_by,
29 m.created_at,
30 m.updated_at,
31 m.event_at,
32 m.confidence,
33 m.category,
34 m.retirement_reason,
35 CASE
36 WHEN m.superseded_by IS NULL THEN NULL
37 ELSE (
38 SELECT MAX(decided_at)
39 FROM supersession_events
40 WHERE loser_pid = m.pid
41 )
42 END AS supersession_at
43";
44
45#[derive(Debug, Clone)]
51pub struct PostgresStore {
52 db: DatabaseConnection,
53}
54
55impl PostgresStore {
56 pub fn new(db: DatabaseConnection) -> Self {
58 Self { db }
59 }
60
61 pub fn db(&self) -> &DatabaseConnection {
63 &self.db
64 }
65}
66
67impl MemoryStore for PostgresStore {
68 async fn remember(&self, new: crate::store::NewMemory) -> Result<Memory, StoreError> {
69 let crate::store::NewMemory {
70 scope,
71 content,
72 metadata,
73 kind,
74 source_pid,
75 event_at,
76 confidence,
77 } = new;
78 scope.validate()?;
79
80 let pid = nanoid::nanoid!(PID_LENGTH);
81
82 let stmt = Statement::from_sql_and_values(
83 sea_orm::DatabaseBackend::Postgres,
84 r#"
85 INSERT INTO memories (pid, agent_id, org_id, user_id, content, metadata, kind, source_pid, event_at, confidence)
86 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
87 RETURNING
88 pid, agent_id, org_id, user_id, content, metadata, kind,
89 qdrant_status, source_pid, superseded_by, created_at, updated_at, event_at,
90 confidence, category, retirement_reason,
91 NULL::TIMESTAMPTZ AS supersession_at
92 "#,
93 [
94 SeaOrmValue::String(Some(pid)),
95 SeaOrmValue::String(Some(scope.agent_id.clone())),
96 SeaOrmValue::String(Some(scope.org_id.clone())),
97 SeaOrmValue::String(Some(scope.user_id.clone())),
98 SeaOrmValue::String(Some(content)),
99 SeaOrmValue::Json(Some(Box::new(metadata))),
100 SeaOrmValue::String(Some(kind.to_string())),
101 SeaOrmValue::String(source_pid),
102 SeaOrmValue::ChronoDateTimeWithTimeZone(event_at),
103 SeaOrmValue::SmallInt(Some(i16::from(confidence.get()))),
105 ],
106 );
107
108 let row = self
109 .db
110 .query_one_raw(stmt)
111 .await?
112 .ok_or_else(|| StoreError::CacheInvariant("insert returned no row".to_string()))?;
113
114 Memory::try_from(&row).map(|mut m| {
115 m.score = None;
116 m
117 })
118 }
119
120 async fn recall(&self, pid: &str) -> Result<Memory, StoreError> {
121 if pid.is_empty() {
122 return Err(StoreError::NotFound(pid.to_string()));
123 }
124
125 let select_sql = format!("SELECT {MEMORY_SELECT_COLUMNS} FROM memories m WHERE m.pid = $1");
126 let stmt = Statement::from_sql_and_values(
127 sea_orm::DatabaseBackend::Postgres,
128 select_sql,
129 [SeaOrmValue::String(Some(pid.to_string()))],
130 );
131
132 let row = self
133 .db
134 .query_one_raw(stmt)
135 .await?
136 .ok_or_else(|| StoreError::NotFound(pid.to_string()))?;
137
138 Memory::try_from(&row)
139 }
140
141 async fn find_by_pids(&self, pids: &[&str]) -> Result<Vec<Memory>, StoreError> {
142 if pids.is_empty() {
143 return Ok(Vec::new());
144 }
145
146 let owned_pids: Vec<String> = pids.iter().map(|p| (*p).to_string()).collect();
147 let select_sql = format!(
148 "SELECT {MEMORY_SELECT_COLUMNS} FROM memories m \
149 WHERE m.pid = ANY($1) AND m.qdrant_status = 'indexed' \
150 AND m.superseded_by IS NULL AND m.retirement_reason IS NULL"
151 );
152 let stmt = Statement::from_sql_and_values(
153 sea_orm::DatabaseBackend::Postgres,
154 select_sql,
155 [SeaOrmValue::Array(
156 sea_orm::sea_query::ArrayType::String,
157 Some(Box::new(
158 owned_pids.into_iter().map(|p| SeaOrmValue::String(Some(p))).collect(),
159 )),
160 )],
161 );
162
163 let rows = self.db.query_all_raw(stmt).await?;
164 let mut memories = Vec::with_capacity(rows.len());
165 for row in &rows {
166 memories.push(Memory::try_from(row)?);
167 }
168 Ok(memories)
169 }
170
171 async fn active_semantics_for_source(&self, source_pid: &str) -> Result<Vec<Memory>, StoreError> {
172 if source_pid.is_empty() {
173 return Ok(Vec::new());
174 }
175
176 let select_sql = format!(
177 "SELECT {MEMORY_SELECT_COLUMNS} FROM memories m \
178 WHERE m.source_pid = $1 AND m.kind = 'semantic' \
179 AND m.superseded_by IS NULL AND m.retirement_reason IS NULL"
180 );
181 let stmt = Statement::from_sql_and_values(
182 sea_orm::DatabaseBackend::Postgres,
183 select_sql,
184 [SeaOrmValue::String(Some(source_pid.to_string()))],
185 );
186
187 let rows = self.db.query_all_raw(stmt).await?;
188 let mut memories = Vec::with_capacity(rows.len());
189 for row in &rows {
190 memories.push(Memory::try_from(row)?);
191 }
192 Ok(memories)
193 }
194
195 async fn extraction_stats(&self, filter: StatsFilter) -> Result<Vec<ExtractionStat>, StoreError> {
196 let mut where_clauses: Vec<String> = vec!["m.kind = 'semantic'".into()];
203 let mut values: Vec<SeaOrmValue> = Vec::new();
204
205 for (column, value) in [
206 ("agent_id", filter.agent_id),
207 ("org_id", filter.org_id),
208 ("user_id", filter.user_id),
209 ] {
210 if let Some(value) = value {
211 values.push(SeaOrmValue::String(Some(value)));
212 where_clauses.push(format!("m.{column} = ${}", values.len()));
213 }
214 }
215
216 let sql = format!(
217 "SELECT \
218 COALESCE(m.metadata ->> 'provider', '') AS provider, \
219 COALESCE(m.metadata ->> 'model', '') AS model, \
220 COUNT(*)::BIGINT AS total, \
221 COUNT(*) FILTER (WHERE m.retirement_reason = 'rejected')::BIGINT AS rejected \
222 FROM memories m \
223 WHERE {} \
224 GROUP BY provider, model \
225 ORDER BY provider ASC, model ASC",
226 where_clauses.join(" AND "),
227 );
228
229 let stmt = Statement::from_sql_and_values(sea_orm::DatabaseBackend::Postgres, sql, values);
230 let rows = self.db.query_all_raw(stmt).await?;
231
232 let mut stats = Vec::with_capacity(rows.len());
233 for row in &rows {
234 stats.push(ExtractionStat {
235 provider: row.try_get::<String>("", "provider")?,
236 model: row.try_get::<String>("", "model")?,
237 total: u64::try_from(row.try_get::<i64>("", "total")?).unwrap_or(0),
238 rejected: u64::try_from(row.try_get::<i64>("", "rejected")?).unwrap_or(0),
239 });
240 }
241 Ok(stats)
242 }
243
244 async fn timeline(&self, scope: Scope, params: TimelineParams) -> Result<Vec<Memory>, StoreError> {
245 scope.validate()?;
246
247 let mut where_clauses: Vec<String> = vec![
248 "m.agent_id = $1".into(),
249 "m.org_id = $2".into(),
250 "m.user_id = $3".into(),
251 ];
252 let mut values: Vec<SeaOrmValue> = vec![
253 SeaOrmValue::String(Some(scope.agent_id)),
254 SeaOrmValue::String(Some(scope.org_id)),
255 SeaOrmValue::String(Some(scope.user_id)),
256 ];
257
258 let included = params.kinds.included_kinds();
259 if included.is_empty() {
260 return Ok(Vec::new());
261 }
262 if !params.kinds.includes_all() {
263 let placeholders: Vec<String> = included
264 .iter()
265 .map(|kind| {
266 values.push(SeaOrmValue::String(Some(kind.to_string())));
267 format!("${}", values.len())
268 })
269 .collect();
270 where_clauses.push(format!("m.kind IN ({})", placeholders.join(", ")));
271 }
272
273 if let Some(t) = params.created_after {
274 values.push(SeaOrmValue::ChronoDateTimeWithTimeZone(Some(t)));
275 where_clauses.push(format!("m.created_at >= ${}", values.len()));
276 }
277 if let Some(t) = params.created_before {
278 values.push(SeaOrmValue::ChronoDateTimeWithTimeZone(Some(t)));
279 where_clauses.push(format!("m.created_at < ${}", values.len()));
280 }
281 if let Some(t) = params.event_at_after {
282 values.push(SeaOrmValue::ChronoDateTimeWithTimeZone(Some(t)));
283 where_clauses.push(format!("m.event_at >= ${}", values.len()));
284 }
285 if let Some(t) = params.event_at_before {
286 values.push(SeaOrmValue::ChronoDateTimeWithTimeZone(Some(t)));
287 where_clauses.push(format!("m.event_at < ${}", values.len()));
288 }
289 if !params.include_superseded {
290 where_clauses.push("m.superseded_by IS NULL".into());
291 }
292 where_clauses.push("m.retirement_reason IS NULL".into());
296
297 let order = match params.direction {
298 TimelineDirection::Descending => "DESC",
299 TimelineDirection::Ascending => "ASC",
300 };
301
302 values.push(SeaOrmValue::BigInt(Some(params.limit as i64)));
303 let limit_placeholder = values.len();
304
305 let sql = format!(
306 "SELECT {MEMORY_SELECT_COLUMNS} FROM memories m \
307 WHERE {where_sql} \
308 ORDER BY m.created_at {order} \
309 LIMIT ${limit_placeholder}",
310 where_sql = where_clauses.join(" AND "),
311 );
312 let stmt = Statement::from_sql_and_values(sea_orm::DatabaseBackend::Postgres, sql, values);
313
314 let rows = self.db.query_all_raw(stmt).await?;
315 let mut memories = Vec::with_capacity(rows.len());
316 for row in &rows {
317 memories.push(Memory::try_from(row)?);
318 }
319 Ok(memories)
320 }
321
322 async fn memories_as_of(&self, scope: Scope, params: AsOfParams) -> Result<Vec<Memory>, StoreError> {
323 scope.validate()?;
324
325 let included = params.kinds.included_kinds();
326 if included.is_empty() {
327 return Ok(Vec::new());
328 }
329
330 let mut where_clauses: Vec<String> = vec![
331 "m.agent_id = $1".into(),
332 "m.org_id = $2".into(),
333 "m.user_id = $3".into(),
334 "m.created_at <= $4".into(),
335 "latest_event.winner_pid IS NULL".into(),
336 "m.retirement_reason IS NULL".into(),
340 ];
341 let mut values: Vec<SeaOrmValue> = vec![
342 SeaOrmValue::String(Some(scope.agent_id)),
343 SeaOrmValue::String(Some(scope.org_id)),
344 SeaOrmValue::String(Some(scope.user_id)),
345 SeaOrmValue::ChronoDateTimeWithTimeZone(Some(params.as_of)),
346 ];
347
348 if !params.kinds.includes_all() {
349 let placeholders: Vec<String> = included
350 .iter()
351 .map(|kind| {
352 values.push(SeaOrmValue::String(Some(kind.to_string())));
353 format!("${}", values.len())
354 })
355 .collect();
356 where_clauses.push(format!("m.kind IN ({})", placeholders.join(", ")));
357 }
358
359 values.push(SeaOrmValue::BigInt(Some(params.limit as i64)));
360 let limit_placeholder = values.len();
361
362 let sql = format!(
363 "SELECT {MEMORY_SELECT_COLUMNS} \
364 FROM memories m \
365 LEFT JOIN LATERAL ( \
366 SELECT loser_pid, winner_pid, decided_at \
367 FROM supersession_events \
368 WHERE loser_pid = m.pid AND decided_at <= $4 \
369 ORDER BY decided_at DESC \
370 LIMIT 1 \
371 ) AS latest_event ON TRUE \
372 WHERE {where_sql} \
373 ORDER BY m.created_at DESC \
374 LIMIT ${limit_placeholder}",
375 where_sql = where_clauses.join(" AND "),
376 );
377 let stmt = Statement::from_sql_and_values(sea_orm::DatabaseBackend::Postgres, sql, values);
378
379 let rows = self.db.query_all_raw(stmt).await?;
380 let mut memories = Vec::with_capacity(rows.len());
381 for row in &rows {
382 memories.push(Memory::try_from(row)?);
383 }
384 Ok(memories)
385 }
386
387 async fn forget(&self, target: ForgetTarget) -> Result<Vec<String>, StoreError> {
388 match target {
389 ForgetTarget::Pid(pid) => self.forget_pid(&pid).await,
390 ForgetTarget::Scope(scope) => self.forget_scope(scope).await,
391 }
392 }
393
394 async fn set_index_status(&self, pid: &str, status: IndexStatus) -> Result<(), StoreError> {
395 let stmt = Statement::from_sql_and_values(
396 sea_orm::DatabaseBackend::Postgres,
397 "UPDATE memories SET qdrant_status = $1 WHERE pid = $2",
398 [
399 SeaOrmValue::String(Some(status.to_string())),
400 SeaOrmValue::String(Some(pid.to_string())),
401 ],
402 );
403
404 let result = self.db.execute_raw(stmt).await?;
405
406 if result.rows_affected() == 0 {
407 return Err(StoreError::NotFound(pid.to_string()));
408 }
409 Ok(())
410 }
411
412 async fn find_failed(&self, limit: usize) -> Result<Vec<Memory>, StoreError> {
413 let select_sql =
414 format!("SELECT {MEMORY_SELECT_COLUMNS} FROM memories m WHERE m.qdrant_status = 'failed' LIMIT $1");
415 let stmt = Statement::from_sql_and_values(
416 sea_orm::DatabaseBackend::Postgres,
417 select_sql,
418 [SeaOrmValue::BigInt(Some(limit as i64))],
419 );
420
421 let rows = self.db.query_all_raw(stmt).await?;
422 let mut memories = Vec::with_capacity(rows.len());
423 for row in &rows {
424 memories.push(Memory::try_from(row)?);
425 }
426 Ok(memories)
427 }
428
429 async fn list_scopes(&self) -> Result<Vec<Scope>, StoreError> {
430 let stmt = Statement::from_string(
431 sea_orm::DatabaseBackend::Postgres,
432 "SELECT DISTINCT agent_id, org_id, user_id FROM memories".to_string(),
433 );
434 let rows = self.db.query_all_raw(stmt).await?;
435
436 let mut scopes = Vec::with_capacity(rows.len());
437 for row in &rows {
438 scopes.push(Scope {
439 agent_id: row.try_get::<String>("", "agent_id")?,
440 org_id: row.try_get::<String>("", "org_id")?,
441 user_id: row.try_get::<String>("", "user_id")?,
442 });
443 }
444 Ok(scopes)
445 }
446
447 async fn list_agent_ids(&self, org_id: &str, user_id: &str) -> Result<Vec<String>, StoreError> {
448 let stmt = Statement::from_sql_and_values(
449 sea_orm::DatabaseBackend::Postgres,
450 r#"
451 SELECT DISTINCT agent_id FROM memories
452 WHERE org_id = $1 AND user_id = $2
453 ORDER BY agent_id ASC
454 "#,
455 [
456 SeaOrmValue::String(Some(org_id.to_owned())),
457 SeaOrmValue::String(Some(user_id.to_owned())),
458 ],
459 );
460
461 let rows = self.db.query_all_raw(stmt).await?;
462 let mut agent_ids = Vec::with_capacity(rows.len());
463 for row in &rows {
464 agent_ids.push(row.try_get::<String>("", "agent_id")?);
465 }
466 Ok(agent_ids)
467 }
468
469 async fn indexed_pids_in_scope(&self, scope: &Scope) -> Result<Vec<String>, StoreError> {
470 scope.validate()?;
471
472 let stmt = Statement::from_sql_and_values(
473 sea_orm::DatabaseBackend::Postgres,
474 r#"
475 SELECT pid FROM memories
476 WHERE agent_id = $1 AND org_id = $2 AND user_id = $3
477 AND qdrant_status = 'indexed'
478 "#,
479 [
480 SeaOrmValue::String(Some(scope.agent_id.clone())),
481 SeaOrmValue::String(Some(scope.org_id.clone())),
482 SeaOrmValue::String(Some(scope.user_id.clone())),
483 ],
484 );
485
486 let rows = self.db.query_all_raw(stmt).await?;
487 let mut pids = Vec::with_capacity(rows.len());
488 for row in &rows {
489 pids.push(row.try_get::<String>("", "pid")?);
490 }
491 Ok(pids)
492 }
493
494 async fn edit(&self, pid: &str, patch: EditPatch) -> Result<Memory, StoreError> {
495 if patch.is_empty() {
496 return self.recall(pid).await;
497 }
498
499 let current = self.recall(pid).await?;
500 if current.kind != MemoryKind::Episodic {
501 return Err(StoreError::UnsupportedEdit {
502 pid: pid.to_string(),
503 kind: current.kind,
504 });
505 }
506
507 let mut set_fragments: Vec<String> = Vec::with_capacity(3);
508 let mut values: Vec<SeaOrmValue> = Vec::with_capacity(4);
509
510 if let Some(content) = patch.content {
511 set_fragments.push(format!("content = ${}", values.len() + 1));
512 values.push(SeaOrmValue::String(Some(content)));
513 }
514 if let Some(metadata) = patch.metadata {
515 set_fragments.push(format!("metadata = ${}", values.len() + 1));
516 values.push(SeaOrmValue::Json(Some(Box::new(metadata))));
517 }
518 if let Some(event_at) = patch.event_at {
519 set_fragments.push(format!("event_at = ${}", values.len() + 1));
520 values.push(SeaOrmValue::ChronoDateTimeWithTimeZone(event_at));
521 }
522
523 let pid_placeholder = values.len() + 1;
524 values.push(SeaOrmValue::String(Some(pid.to_string())));
525
526 let sql = format!(
527 "UPDATE memories SET {set} WHERE pid = ${pid_placeholder}",
528 set = set_fragments.join(", "),
529 );
530 let stmt = Statement::from_sql_and_values(sea_orm::DatabaseBackend::Postgres, sql, values);
531
532 let result = self.db.execute_raw(stmt).await?;
533 if result.rows_affected() == 0 {
534 return Err(StoreError::NotFound(pid.to_string()));
535 }
536
537 self.recall(pid).await
538 }
539
540 async fn set_category(&self, pid: &str, category: &str) -> Result<(), StoreError> {
541 let stmt = Statement::from_sql_and_values(
542 sea_orm::DatabaseBackend::Postgres,
543 "UPDATE memories SET category = $1 WHERE pid = $2",
544 [
545 SeaOrmValue::String(Some(category.to_string())),
546 SeaOrmValue::String(Some(pid.to_string())),
547 ],
548 );
549 let result = self.db.execute_raw(stmt).await?;
550 if result.rows_affected() == 0 {
551 return Err(StoreError::NotFound(pid.to_string()));
552 }
553 Ok(())
554 }
555
556 async fn retire(&self, pid: &str, reason: crate::memory::RetirementReason) -> Result<(), StoreError> {
557 let stmt = Statement::from_sql_and_values(
558 sea_orm::DatabaseBackend::Postgres,
559 "UPDATE memories SET retirement_reason = $1 WHERE pid = $2",
560 [
561 SeaOrmValue::String(Some(reason.to_string())),
562 SeaOrmValue::String(Some(pid.to_string())),
563 ],
564 );
565 let result = self.db.execute_raw(stmt).await?;
566 if result.rows_affected() == 0 {
567 return Err(StoreError::NotFound(pid.to_string()));
568 }
569 Ok(())
570 }
571
572 async fn supersede(&self, pid: &str, by_pid: &str) -> Result<(), StoreError> {
573 let stmt = Statement::from_sql_and_values(
580 sea_orm::DatabaseBackend::Postgres,
581 r#"
582 INSERT INTO supersession_events (loser_pid, winner_pid)
583 SELECT $1, $2
584 WHERE EXISTS (SELECT 1 FROM memories WHERE pid = $1)
585 "#,
586 [
587 SeaOrmValue::String(Some(pid.to_string())),
588 SeaOrmValue::String(Some(by_pid.to_string())),
589 ],
590 );
591
592 let result = self.db.execute_raw(stmt).await?;
593
594 if result.rows_affected() == 0 {
595 return Err(StoreError::NotFound(pid.to_string()));
596 }
597 Ok(())
598 }
599
600 async fn unsupersede(&self, pid: &str) -> Result<(), StoreError> {
601 let stmt = Statement::from_sql_and_values(
606 sea_orm::DatabaseBackend::Postgres,
607 r#"
608 INSERT INTO supersession_events (loser_pid, winner_pid)
609 SELECT $1, NULL
610 WHERE EXISTS (SELECT 1 FROM memories WHERE pid = $1)
611 "#,
612 [SeaOrmValue::String(Some(pid.to_string()))],
613 );
614
615 let result = self.db.execute_raw(stmt).await?;
616
617 if result.rows_affected() == 0 {
618 return Err(StoreError::NotFound(pid.to_string()));
619 }
620 Ok(())
621 }
622
623 async fn supersession_at(&self, pid: &str, as_of: DateTime<FixedOffset>) -> Result<Option<String>, StoreError> {
624 let stmt = Statement::from_sql_and_values(
631 sea_orm::DatabaseBackend::Postgres,
632 r#"
633 SELECT winner_pid
634 FROM supersession_events
635 WHERE loser_pid = $1 AND decided_at <= $2
636 ORDER BY decided_at DESC
637 LIMIT 1
638 "#,
639 [
640 SeaOrmValue::String(Some(pid.to_string())),
641 SeaOrmValue::ChronoDateTimeWithTimeZone(Some(as_of)),
642 ],
643 );
644
645 let row = self.db.query_one_raw(stmt).await?;
646 match row {
647 None => Ok(None),
648 Some(row) => row.try_get("", "winner_pid").map_err(StoreError::from),
649 }
650 }
651
652 async fn supersession_history(&self, pid: &str) -> Result<Vec<SupersessionEvent>, StoreError> {
653 let stmt = Statement::from_sql_and_values(
657 sea_orm::DatabaseBackend::Postgres,
658 r#"
659 SELECT winner_pid, decided_at
660 FROM supersession_events
661 WHERE loser_pid = $1
662 ORDER BY decided_at ASC
663 "#,
664 [SeaOrmValue::String(Some(pid.to_string()))],
665 );
666
667 let rows = self.db.query_all_raw(stmt).await?;
668 let mut trail = Vec::with_capacity(rows.len());
669 for row in &rows {
670 trail.push(SupersessionEvent {
671 winner_pid: row.try_get("", "winner_pid")?,
672 decided_at: row.try_get("", "decided_at")?,
673 });
674 }
675 Ok(trail)
676 }
677}
678
679impl PostgresStore {
680 async fn forget_pid(&self, pid: &str) -> Result<Vec<String>, StoreError> {
681 let stmt = Statement::from_sql_and_values(
690 sea_orm::DatabaseBackend::Postgres,
691 r#"
692 WITH derived AS (
693 DELETE FROM memories WHERE source_pid = $1 RETURNING pid
694 ), root AS (
695 DELETE FROM memories WHERE pid = $1 RETURNING pid
696 )
697 SELECT pid FROM derived
698 UNION ALL
699 SELECT pid FROM root
700 "#,
701 [SeaOrmValue::String(Some(pid.to_string()))],
702 );
703 let rows = self.db.query_all_raw(stmt).await?;
704 let mut deleted = Vec::with_capacity(rows.len());
705 for row in &rows {
706 deleted.push(row.try_get::<String>("", "pid")?);
707 }
708 Ok(deleted)
709 }
710
711 async fn forget_scope(&self, scope: Scope) -> Result<Vec<String>, StoreError> {
712 scope.validate()?;
713
714 let stmt = Statement::from_sql_and_values(
715 sea_orm::DatabaseBackend::Postgres,
716 "DELETE FROM memories WHERE agent_id = $1 AND org_id = $2 AND user_id = $3 RETURNING pid",
717 [
718 SeaOrmValue::String(Some(scope.agent_id)),
719 SeaOrmValue::String(Some(scope.org_id)),
720 SeaOrmValue::String(Some(scope.user_id)),
721 ],
722 );
723 let rows = self.db.query_all_raw(stmt).await?;
724 let mut deleted = Vec::with_capacity(rows.len());
725 for row in &rows {
726 deleted.push(row.try_get::<String>("", "pid")?);
727 }
728 Ok(deleted)
729 }
730}
731
732impl TryFrom<&sea_orm::QueryResult> for Memory {
733 type Error = StoreError;
734
735 fn try_from(row: &sea_orm::QueryResult) -> Result<Self, Self::Error> {
736 let pid: String = row.try_get("", "pid")?;
737 let agent_id: String = row.try_get("", "agent_id")?;
738 let org_id: String = row.try_get("", "org_id")?;
739 let user_id: String = row.try_get("", "user_id")?;
740 let content: String = row.try_get("", "content")?;
741 let metadata: serde_json::Value = row.try_get("", "metadata")?;
742 let kind_str: String = row.try_get("", "kind")?;
743 let status_str: String = row.try_get("", "qdrant_status")?;
744 let source_pid: Option<String> = row.try_get("", "source_pid")?;
745 let superseded_by: Option<String> = row.try_get("", "superseded_by")?;
746 let created_at: DateTime<FixedOffset> = row.try_get("", "created_at")?;
747 let updated_at: DateTime<FixedOffset> = row.try_get("", "updated_at")?;
748 let event_at: Option<DateTime<FixedOffset>> = row.try_get("", "event_at")?;
749 let confidence_raw: i16 = row.try_get("", "confidence")?;
750 let category: Option<String> = row.try_get("", "category")?;
751 let retirement_str: Option<String> = row.try_get("", "retirement_reason")?;
752 let supersession_at: Option<DateTime<FixedOffset>> = row.try_get("", "supersession_at")?;
753
754 let kind: MemoryKind = kind_str
755 .parse()
756 .map_err(|_| StoreError::CacheInvariant(format!("unknown memory kind: {kind_str}")))?;
757
758 let status: IndexStatus = status_str
759 .parse()
760 .map_err(|_| StoreError::CacheInvariant(format!("unknown qdrant status: {status_str}")))?;
761
762 let retirement = retirement_str
763 .map(|s| {
764 s.parse::<crate::memory::RetirementReason>()
765 .map_err(|_| StoreError::CacheInvariant(format!("unknown retirement reason: {s}")))
766 })
767 .transpose()?;
768
769 let confidence = crate::memory::Confidence::new(confidence_raw.clamp(0, 100) as i8);
773
774 let supersession = match (superseded_by, supersession_at) {
775 (Some(winner_pid), Some(at)) => Some(crate::memory::SupersessionInfo { winner_pid, at }),
776 (None, None) => None,
777 (Some(winner_pid), None) => {
778 return Err(StoreError::CacheInvariant(format!(
779 "row {pid}: superseded_by={winner_pid} but no supersession_events row found"
780 )));
781 }
782 (None, Some(_)) => {
783 return Err(StoreError::CacheInvariant(format!(
784 "row {pid}: supersession_at populated but superseded_by is NULL"
785 )));
786 }
787 };
788
789 Ok(Memory {
790 pid,
791 scope: Scope {
792 agent_id,
793 org_id,
794 user_id,
795 },
796 content,
797 metadata,
798 kind,
799 source_pid,
800 supersession,
801 created_at,
802 updated_at,
803 event_at,
804 score: None,
805 status,
806 confidence,
807 category,
808 retirement,
809 })
810 }
811}