Skip to main content

memoir_core/store/
mod.rs

1//! Postgres-backed source-of-truth storage for memories.
2//!
3//! Defines [`MemoryStore`], implemented by [`PostgresStore`] (the default) and
4//! by callers who want to plug in a different backend or a test mock.
5//! Vector search is a separate concern handled by [`crate::vector::VectorIndex`];
6//! this trait covers only the source-of-truth row operations.
7
8mod error;
9pub mod postgres;
10
11pub use error::StoreError;
12pub use postgres::PostgresStore;
13
14use std::future::Future;
15
16use chrono::{DateTime, FixedOffset};
17
18use crate::memory::{ExtractionStat, ForgetTarget, Memory, MemoryKind, Scope, StatsFilter, SupersessionEvent};
19
20/// Lifecycle state of a memory's vector index.
21///
22/// Persisted as the `qdrant_status` column on the memories table. The column
23/// name is historical; the state is generic over which vector backend an
24/// implementation uses.
25#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, strum::Display, strum::EnumString, strum::AsRefStr)]
26#[strum(serialize_all = "lowercase")]
27pub enum IndexStatus {
28    /// Row written to Postgres; embedding + vector upsert in flight.
29    Pending,
30
31    /// Vector upserted; memory is searchable.
32    Indexed,
33
34    /// Embedding or vector upsert failed; reconciliation will retry.
35    Failed,
36}
37
38/// The attributes of a new memory row for [`MemoryStore::remember`].
39///
40/// Groups the row's write-time attributes into one value so the insert path
41/// has a single self-documenting parameter rather than a long positional list
42/// (M-INIT-CASCADED). Every field is stated explicitly by the caller — there
43/// are no silent defaults at this layer; the episodic and extract write paths
44/// each supply their own `kind`, `confidence`, etc.
45#[derive(Debug, Clone)]
46pub struct NewMemory {
47    /// Tenant + agent + user partition.
48    pub scope: Scope,
49
50    /// Raw text of the memory.
51    pub content: String,
52
53    /// Arbitrary JSON attached at write time; round-trips unchanged.
54    pub metadata: serde_json::Value,
55
56    /// Episodic (raw utterance) or semantic (extracted fact).
57    pub kind: MemoryKind,
58
59    /// Originating episodic pid for semantic rows; `None` for episodic.
60    pub source_pid: Option<String>,
61
62    /// Event-time of the remembered thing; `None` when unknown.
63    pub event_at: Option<DateTime<FixedOffset>>,
64
65    /// How sure memoir is of this memory: `MAX` for episodic, the scaled
66    /// extraction score for semantic.
67    pub confidence: crate::memory::Confidence,
68}
69
70/// Field-level patch for [`MemoryStore::edit`].
71///
72/// Each field is `Option`-tracked so callers update only what they pass.
73/// `None` means "leave this field untouched"; `Some(value)` means "overwrite
74/// with this value." `event_at = Some(None)` is reachable via the nested
75/// `Option` and means "clear the event-time"; the outer wrapper distinguishes
76/// "untouched" from "explicitly cleared."
77#[derive(Debug, Clone, Default)]
78pub struct EditPatch {
79    /// New content. `None` leaves it unchanged.
80    pub content: Option<String>,
81
82    /// New metadata blob. `None` leaves it unchanged.
83    pub metadata: Option<serde_json::Value>,
84
85    /// New event-time. Outer `None` means "untouched"; `Some(None)` clears.
86    pub event_at: Option<Option<DateTime<FixedOffset>>>,
87}
88
89impl EditPatch {
90    /// Returns `true` when no field is set — the patch is a no-op.
91    #[must_use]
92    pub fn is_empty(&self) -> bool {
93        self.content.is_none() && self.metadata.is_none() && self.event_at.is_none()
94    }
95}
96
97/// Direction in which [`MemoryStore::timeline`] orders rows by `created_at`.
98#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
99pub enum TimelineDirection {
100    /// Newest-first — `ORDER BY created_at DESC`.
101    #[default]
102    Descending,
103
104    /// Oldest-first — `ORDER BY created_at ASC`.
105    Ascending,
106}
107
108/// Default page size for [`MemoryStore::timeline`].
109pub const DEFAULT_TIMELINE_LIMIT: usize = 50;
110
111/// Parameters for [`MemoryStore::timeline`].
112///
113/// `None` on a window bound means "no bound on that side." `include_superseded`
114/// defaults to `true` because timeline is the audit view; consumers wanting
115/// "current truth only" pass `false`.
116#[derive(Debug, Clone)]
117pub struct TimelineParams {
118    pub kinds: crate::memory::KindSelector,
119    pub created_after: Option<DateTime<FixedOffset>>,
120    pub created_before: Option<DateTime<FixedOffset>>,
121    pub event_at_after: Option<DateTime<FixedOffset>>,
122    pub event_at_before: Option<DateTime<FixedOffset>>,
123    pub include_superseded: bool,
124    pub limit: usize,
125    pub direction: TimelineDirection,
126}
127
128impl Default for TimelineParams {
129    fn default() -> Self {
130        Self {
131            kinds: crate::memory::KindSelector::default(),
132            created_after: None,
133            created_before: None,
134            event_at_after: None,
135            event_at_before: None,
136            include_superseded: true,
137            limit: DEFAULT_TIMELINE_LIMIT,
138            direction: TimelineDirection::Descending,
139        }
140    }
141}
142
143/// Parameters for [`MemoryStore::memories_as_of`].
144///
145/// Returns memories that *existed* (`created_at <= as_of`) and were *active*
146/// (not yet superseded as of `as_of`). `kinds` filter mirrors timeline's.
147#[derive(Debug, Clone)]
148pub struct AsOfParams {
149    pub as_of: DateTime<FixedOffset>,
150    pub kinds: crate::memory::KindSelector,
151    pub limit: usize,
152}
153
154impl AsOfParams {
155    /// Builds a default `AsOfParams` for `as_of` — all kinds, default limit.
156    pub fn new(as_of: DateTime<FixedOffset>) -> Self {
157        Self {
158            as_of,
159            kinds: crate::memory::KindSelector::default(),
160            limit: DEFAULT_TIMELINE_LIMIT,
161        }
162    }
163}
164
165/// Persists and retrieves memory rows from the source-of-truth store.
166///
167/// Implementations own the database connection. The trait methods are async
168/// and `Send`-bound so callers can drive them from any tokio runtime,
169/// including across `spawn` boundaries.
170pub trait MemoryStore: Send + Sync + 'static {
171    /// Inserts a new memory and returns the persisted row.
172    ///
173    /// The returned [`Memory`] carries the server-generated `pid`,
174    /// `created_at`, `updated_at` (equal to `created_at` on insert), and a
175    /// `score` of `None`. See [`NewMemory`] for the write-time attributes;
176    /// `source_pid` is `None` for episodic rows and `Some(pid)` for semantic
177    /// rows, and `confidence` is [`crate::memory::Confidence::MAX`] for
178    /// episodic rows (the user said it) or the scaled extraction score for
179    /// semantic rows.
180    ///
181    /// # Errors
182    ///
183    /// Returns [`StoreError::InvalidScope`] if any scope field is empty,
184    /// [`StoreError::Database`] for database failures.
185    fn remember(&self, new: NewMemory) -> impl Future<Output = Result<Memory, StoreError>> + Send;
186
187    /// Looks up a single memory by pid, returning all lifecycle states.
188    ///
189    /// # Errors
190    ///
191    /// Returns [`StoreError::NotFound`] when no memory matches `pid`,
192    /// [`StoreError::Database`] for database failures.
193    fn recall(&self, pid: &str) -> impl Future<Output = Result<Memory, StoreError>> + Send;
194
195    /// Returns memories in `scope` ordered by `created_at`, with optional filters.
196    ///
197    /// Postgres-only read; does not consult the vector index. Includes
198    /// superseded rows by default — pass [`TimelineParams::include_superseded`]
199    /// = `false` to filter them out. The `kinds` selector mirrors search's
200    /// kind toggles. Hydrated rows carry `score = None` (no similarity was
201    /// computed).
202    ///
203    /// # Errors
204    ///
205    /// Returns [`StoreError::InvalidScope`] if any scope field is empty,
206    /// [`StoreError::Database`] for database failures.
207    fn timeline(
208        &self,
209        scope: Scope,
210        params: TimelineParams,
211    ) -> impl Future<Output = Result<Vec<Memory>, StoreError>> + Send;
212
213    /// Returns memories that were active in `scope` as of `params.as_of`.
214    ///
215    /// A memory is included when `created_at <= as_of` AND, considering only
216    /// `supersession_events` rows with `decided_at <= as_of`, the memory is
217    /// not currently superseded (either no such events exist or the most
218    /// recent one was an unsupersede with `winner_pid IS NULL`). Ordering
219    /// is newest-first by `created_at`. Hydrated rows carry `score = None`.
220    ///
221    /// # Errors
222    ///
223    /// Returns [`StoreError::InvalidScope`] if any scope field is empty,
224    /// [`StoreError::Database`] for database failures.
225    fn memories_as_of(
226        &self,
227        scope: Scope,
228        params: AsOfParams,
229    ) -> impl Future<Output = Result<Vec<Memory>, StoreError>> + Send;
230
231    /// Fetches multiple memories by pid, returning only indexed rows.
232    ///
233    /// Used by the client facade to hydrate vector-search hits into full
234    /// [`Memory`] values. Pids whose rows are in non-indexed lifecycle states
235    /// (`pending`, `failed`) are silently omitted.
236    ///
237    /// # Errors
238    ///
239    /// Returns [`StoreError::Database`] for database failures.
240    fn find_by_pids(&self, pids: &[&str]) -> impl Future<Output = Result<Vec<Memory>, StoreError>> + Send;
241
242    /// Returns the active semantic rows derived from `source_pid` (epic 0011 Track B).
243    ///
244    /// "Active" means not yet superseded and not yet retired: the rows the
245    /// reprocess engine must retire before re-deriving fresh ones. Episodic
246    /// sources own zero or more semantic rows via `source_pid`; this is that
247    /// set, filtered to the live ones. An unknown or episodic-only source
248    /// yields an empty vector, not an error. Index lifecycle is ignored —
249    /// a still-`pending` derived row is just as much in need of retirement.
250    ///
251    /// # Errors
252    ///
253    /// Returns [`StoreError::Database`] for database failures.
254    fn active_semantics_for_source(
255        &self,
256        source_pid: &str,
257    ) -> impl Future<Output = Result<Vec<Memory>, StoreError>> + Send;
258
259    /// Tallies extraction accuracy per `(provider, model)` over a scope slice.
260    ///
261    /// Groups every semantic row matching `filter` by its producing provider
262    /// and model (read from the row's `metadata` blob), counting the total
263    /// produced and the subset retired as [`crate::memory::RetirementReason::Rejected`].
264    /// Rows retired as `Stale` and superseded rows count toward the total but
265    /// never the rejected tally — only a corrected wrong extraction is a model
266    /// error. An empty [`StatsFilter`] aggregates the whole store. A slice with
267    /// no semantic rows yields an empty vector, not an error. Results are
268    /// ordered by `(provider, model)` ascending for stable output.
269    ///
270    /// # Errors
271    ///
272    /// Returns [`StoreError::Database`] for database failures.
273    fn extraction_stats(
274        &self,
275        filter: StatsFilter,
276    ) -> impl Future<Output = Result<Vec<ExtractionStat>, StoreError>> + Send;
277
278    /// Deletes one memory or every memory in a scope, returning deleted pids.
279    ///
280    /// The returned pids let callers issue follow-up deletes against the
281    /// vector index or graph store. A single-pid target returns the named pid
282    /// *and* the pids of its derived semantic rows (cascade-deleted via the
283    /// `source_pid` foreign key) — callers must evict every returned pid's
284    /// vector, or the cascade leaves orphaned points behind.
285    ///
286    /// # Errors
287    ///
288    /// Returns [`StoreError::InvalidScope`] if a scope target has empty
289    /// fields, [`StoreError::Database`] for database failures.
290    fn forget(&self, target: ForgetTarget) -> impl Future<Output = Result<Vec<String>, StoreError>> + Send;
291
292    /// Updates a memory's index lifecycle state.
293    ///
294    /// Called by the async embed substrate after vector upsert succeeds or
295    /// fails, and by the reconciliation sweep.
296    ///
297    /// # Errors
298    ///
299    /// Returns [`StoreError::NotFound`] when no memory matches `pid`,
300    /// [`StoreError::Database`] for database failures.
301    fn set_index_status(&self, pid: &str, status: IndexStatus) -> impl Future<Output = Result<(), StoreError>> + Send;
302
303    /// Sets a memory's category label (epic 0011 ticket 0005).
304    ///
305    /// Called by the categorize worker after NLI classification. Overwrites
306    /// any prior category. The value set is the caller's responsibility; the
307    /// column is unconstrained `TEXT` (the taxonomy lives in the worker).
308    ///
309    /// # Errors
310    ///
311    /// Returns [`StoreError::NotFound`] when no memory matches `pid`,
312    /// [`StoreError::Database`] for database failures.
313    fn set_category(&self, pid: &str, category: &str) -> impl Future<Output = Result<(), StoreError>> + Send;
314
315    /// Retires a memory with the given reason (epic 0011 Track B).
316    ///
317    /// Sets `retirement_reason`, hiding the row from all active-row reads.
318    /// The row is NOT deleted — it stays recall-reachable by pid for audit
319    /// and is the reprocess "don't re-derive this" guard + accuracy-metric
320    /// record. The caller is responsible for evicting the row's vector (the
321    /// store has no vector index); see [`crate::client::Client::reject`] /
322    /// `mark_stale`, which orchestrate both.
323    ///
324    /// # Errors
325    ///
326    /// Returns [`StoreError::NotFound`] when no memory matches `pid`,
327    /// [`StoreError::Database`] for database failures.
328    fn retire(
329        &self,
330        pid: &str,
331        reason: crate::memory::RetirementReason,
332    ) -> impl Future<Output = Result<(), StoreError>> + Send;
333
334    /// Returns up to `limit` memories whose index lifecycle is `failed`.
335    ///
336    /// Used by the reconciliation sweep to retry embed + upsert. Returned in
337    /// no specific order; the caller drives retry concurrency.
338    ///
339    /// # Errors
340    ///
341    /// Returns [`StoreError::Database`] for database failures.
342    fn find_failed(&self, limit: usize) -> impl Future<Output = Result<Vec<Memory>, StoreError>> + Send;
343
344    /// Returns every distinct scope tuple present in the store.
345    ///
346    /// Used by the reconciliation sweep's orphan-cleanup pass to know which
347    /// scopes need a vector-index scroll. Expected to be cheap for typical
348    /// tenant counts; very large deployments may need pagination later.
349    ///
350    /// # Errors
351    ///
352    /// Returns [`StoreError::Database`] for database failures.
353    fn list_scopes(&self) -> impl Future<Output = Result<Vec<Scope>, StoreError>> + Send;
354
355    /// Returns the distinct agent ids that have memories in the given
356    /// org + user scope, sorted ascending.
357    ///
358    /// Powers caller-scoped agent discovery (e.g. the UI's agent picker): a
359    /// user sees only the agents under their own org and user, never another
360    /// tenant's. Returns an empty vec when the scope has no memories yet.
361    ///
362    /// # Errors
363    ///
364    /// Returns [`StoreError::Database`] for database failures.
365    fn list_agent_ids(
366        &self,
367        org_id: &str,
368        user_id: &str,
369    ) -> impl Future<Output = Result<Vec<String>, StoreError>> + Send;
370
371    /// Returns every indexed pid for the given scope.
372    ///
373    /// Used by the reconciliation sweep's orphan-cleanup pass to compare
374    /// against the vector index's scope contents. Only `indexed` rows are
375    /// returned; `pending`/`failed` rows are not yet expected to have a
376    /// vector index entry.
377    ///
378    /// # Errors
379    ///
380    /// Returns [`StoreError::InvalidScope`] if any scope field is empty,
381    /// [`StoreError::Database`] for database failures.
382    fn indexed_pids_in_scope(&self, scope: &Scope) -> impl Future<Output = Result<Vec<String>, StoreError>> + Send;
383
384    /// Mutates a memory in place. See [`EditPatch`] for the field semantics.
385    ///
386    /// Distinct from [`Self::supersede`]: edit *overwrites* the original row
387    /// because it was wrong (a correction), while supersede preserves it
388    /// because new information obsoletes — but does not invalidate — old.
389    /// `created_at` is unchanged; `updated_at` is bumped by the database
390    /// trigger. The caller is responsible for re-embedding the row after a
391    /// content change (enqueue a `JobKind::Embed` job; the worker handles
392    /// the upsert) and for flipping `qdrant_status` back to `pending` so
393    /// the row falls out of search until re-embedding completes.
394    ///
395    /// # Errors
396    ///
397    /// Returns [`StoreError::NotFound`] when no memory matches `pid`,
398    /// [`StoreError::UnsupportedEdit`] when the target row's kind does not
399    /// support in-place edits (currently every kind except `Episodic`),
400    /// [`StoreError::Database`] for database failures.
401    fn edit(&self, pid: &str, patch: EditPatch) -> impl Future<Output = Result<Memory, StoreError>> + Send;
402
403    /// Marks `pid` as superseded by `by_pid`.
404    ///
405    /// Records a row in the `supersession_events` audit table; a database
406    /// trigger maintains the cached `memories.superseded_by` column so
407    /// search paths continue to filter superseded rows out. Idempotent in
408    /// effect (the cache reflects the latest event), but every call is
409    /// recorded in history. Internal API — callers must come from the
410    /// contradiction-detection engine, not user code.
411    ///
412    /// # Errors
413    ///
414    /// Returns [`StoreError::NotFound`] when no memory matches `pid`,
415    /// [`StoreError::Database`] for database failures (including FK
416    /// violations when `by_pid` does not exist).
417    fn supersede(&self, pid: &str, by_pid: &str) -> impl Future<Output = Result<(), StoreError>> + Send;
418
419    /// Clears the supersession marker on `pid`, restoring it to active state.
420    ///
421    /// Used by the admin surface when an operator decides a supersession was
422    /// wrong. Records an unsupersede event in the audit log; the trigger
423    /// clears the cache. The audit row is always recorded, even when the
424    /// row was already active — operator intent is preserved in history.
425    ///
426    /// # Errors
427    ///
428    /// Returns [`StoreError::NotFound`] when no memory matches `pid`,
429    /// [`StoreError::Database`] for database failures.
430    fn unsupersede(&self, pid: &str) -> impl Future<Output = Result<(), StoreError>> + Send;
431
432    /// Returns the winner pid `pid` was superseded by as of `as_of`, or `None`.
433    ///
434    /// Walks the `supersession_events` audit table for `pid`, returning the
435    /// `winner_pid` of the most recent event whose `decided_at <= as_of`.
436    /// `None` covers three cases: the pid has no supersession events at all,
437    /// the events all occurred after `as_of`, or the latest event before
438    /// `as_of` was an unsupersede (a row with `winner_pid IS NULL`).
439    ///
440    /// Used by point-in-time reads (`Client::recall_as_of`, ticket 0009) to
441    /// answer "was this memory active at T?" The cached
442    /// `memories.superseded_by` column is the present-time answer; this
443    /// method answers the same question for arbitrary past timestamps.
444    ///
445    /// # Errors
446    ///
447    /// Returns [`StoreError::Database`] for database failures.
448    fn supersession_at(
449        &self,
450        pid: &str,
451        as_of: DateTime<FixedOffset>,
452    ) -> impl Future<Output = Result<Option<String>, StoreError>> + Send;
453
454    /// Returns every supersession decision against `pid` in chronological order.
455    ///
456    /// Reads the `supersession_events` audit table for `pid` and returns
457    /// each event ascending by `decided_at`. An event with `winner_pid =
458    /// None` is an unsupersede. Used by the supersession-audit view to
459    /// render the full trail (supersede → unsupersede → re-supersede), in
460    /// contrast to [`Self::supersession_at`] which collapses the trail to
461    /// a single point-in-time answer.
462    ///
463    /// A `pid` with no events — whether it was never superseded or does
464    /// not exist — returns an empty vec, not an error. The events table is
465    /// the source of truth here.
466    ///
467    /// # Errors
468    ///
469    /// Returns [`StoreError::Database`] for database failures.
470    fn supersession_history(
471        &self,
472        pid: &str,
473    ) -> impl Future<Output = Result<Vec<SupersessionEvent>, StoreError>> + Send;
474}
475
476#[cfg(test)]
477mod tests {
478    use super::*;
479    use chrono::Utc;
480    use std::collections::BTreeMap;
481    use std::sync::Mutex;
482
483    /// One row of the in-memory supersession event log used by `StubStore`.
484    ///
485    /// Mirrors the Postgres `supersession_events` table shape. `winner_pid`
486    /// is `None` for unsupersede events, matching the SQL `NULL` semantics.
487    #[derive(Debug, Clone)]
488    struct StubEvent {
489        loser_pid: String,
490        winner_pid: Option<String>,
491        decided_at: DateTime<FixedOffset>,
492    }
493
494    #[derive(Default)]
495    struct StubStore {
496        memories: Mutex<Vec<Memory>>,
497        events: Mutex<Vec<StubEvent>>,
498    }
499
500    impl StubStore {
501        /// Recomputes a memory's `supersession` field from the event log.
502        ///
503        /// Replicates the Postgres trigger: latest event wins, `winner_pid IS
504        /// NULL` clears the cache. Called after every supersede/unsupersede
505        /// so reads see a consistent cached view without consulting the log.
506        fn refresh_cache(&self, pid: &str) {
507            let events = self.events.lock().unwrap();
508            let latest = events
509                .iter()
510                .filter(|e| e.loser_pid == pid)
511                .max_by_key(|e| e.decided_at);
512            let supersession = latest.and_then(|e| {
513                e.winner_pid.clone().map(|winner_pid| crate::memory::SupersessionInfo {
514                    winner_pid,
515                    at: e.decided_at,
516                })
517            });
518            drop(events);
519            let mut memories = self.memories.lock().unwrap();
520            if let Some(m) = memories.iter_mut().find(|m| m.pid == pid) {
521                m.supersession = supersession;
522            }
523        }
524    }
525
526    impl MemoryStore for StubStore {
527        async fn remember(&self, new: NewMemory) -> Result<Memory, StoreError> {
528            let NewMemory {
529                scope,
530                content,
531                metadata,
532                kind,
533                source_pid,
534                event_at,
535                confidence,
536            } = new;
537            let now: chrono::DateTime<chrono::FixedOffset> = Utc::now().into();
538            let memory = Memory {
539                pid: format!("test-{}", self.memories.lock().unwrap().len()),
540                scope,
541                content,
542                metadata,
543                kind,
544                source_pid,
545                supersession: None,
546                created_at: now,
547                updated_at: now,
548                event_at,
549                score: None,
550                status: IndexStatus::Pending,
551                confidence,
552                category: None,
553                retirement: None,
554            };
555            self.memories.lock().unwrap().push(memory.clone());
556            Ok(memory)
557        }
558
559        async fn recall(&self, pid: &str) -> Result<Memory, StoreError> {
560            self.memories
561                .lock()
562                .unwrap()
563                .iter()
564                .find(|m| m.pid == pid)
565                .cloned()
566                .ok_or_else(|| StoreError::NotFound(pid.to_string()))
567        }
568
569        async fn find_by_pids(&self, pids: &[&str]) -> Result<Vec<Memory>, StoreError> {
570            let memories = self.memories.lock().unwrap();
571            Ok(pids
572                .iter()
573                .filter_map(|pid| memories.iter().find(|m| m.pid == *pid).cloned())
574                .collect())
575        }
576
577        async fn active_semantics_for_source(&self, source_pid: &str) -> Result<Vec<Memory>, StoreError> {
578            let memories = self.memories.lock().unwrap();
579            Ok(memories
580                .iter()
581                .filter(|m| m.kind == MemoryKind::Semantic)
582                .filter(|m| m.source_pid.as_deref() == Some(source_pid))
583                .filter(|m| m.supersession.is_none() && m.retirement.is_none())
584                .cloned()
585                .collect())
586        }
587
588        async fn extraction_stats(&self, filter: StatsFilter) -> Result<Vec<ExtractionStat>, StoreError> {
589            let memories = self.memories.lock().unwrap();
590            let mut tallies: BTreeMap<(String, String), (u64, u64)> = BTreeMap::new();
591            for m in memories.iter() {
592                if m.kind != MemoryKind::Semantic {
593                    continue;
594                }
595                if filter.agent_id.as_ref().is_some_and(|a| a != &m.scope.agent_id)
596                    || filter.org_id.as_ref().is_some_and(|o| o != &m.scope.org_id)
597                    || filter.user_id.as_ref().is_some_and(|u| u != &m.scope.user_id)
598                {
599                    continue;
600                }
601                let provider = m.metadata.get("provider").and_then(|v| v.as_str()).unwrap_or("").to_string();
602                let model = m.metadata.get("model").and_then(|v| v.as_str()).unwrap_or("").to_string();
603                let entry = tallies.entry((provider, model)).or_insert((0, 0));
604                entry.0 += 1;
605                if m.retirement == Some(crate::memory::RetirementReason::Rejected) {
606                    entry.1 += 1;
607                }
608            }
609
610            Ok(tallies
611                .into_iter()
612                .map(|((provider, model), (total, rejected))| ExtractionStat {
613                    provider,
614                    model,
615                    total,
616                    rejected,
617                })
618                .collect())
619        }
620
621        async fn timeline(&self, scope: Scope, params: TimelineParams) -> Result<Vec<Memory>, StoreError> {
622            scope.validate()?;
623            let memories = self.memories.lock().unwrap();
624
625            let mut filtered: Vec<Memory> = memories
626                .iter()
627                .filter(|m| m.scope == scope)
628                .filter(|m| match m.kind {
629                    MemoryKind::Episodic => params.kinds.episodic,
630                    MemoryKind::Semantic => params.kinds.semantic,
631                })
632                .filter(|m| params.created_after.is_none_or(|t| m.created_at >= t))
633                .filter(|m| params.created_before.is_none_or(|t| m.created_at < t))
634                .filter(|m| {
635                    params
636                        .event_at_after
637                        .is_none_or(|t| m.event_at.is_some_and(|ev| ev >= t))
638                })
639                .filter(|m| {
640                    params
641                        .event_at_before
642                        .is_none_or(|t| m.event_at.is_some_and(|ev| ev < t))
643                })
644                .filter(|m| params.include_superseded || m.supersession.is_none())
645                .cloned()
646                .collect();
647
648            filtered.sort_by(|a, b| match params.direction {
649                TimelineDirection::Descending => b.created_at.cmp(&a.created_at),
650                TimelineDirection::Ascending => a.created_at.cmp(&b.created_at),
651            });
652            filtered.truncate(params.limit);
653            Ok(filtered)
654        }
655
656        async fn memories_as_of(&self, scope: Scope, params: AsOfParams) -> Result<Vec<Memory>, StoreError> {
657            scope.validate()?;
658            let memories = self.memories.lock().unwrap();
659            let events = self.events.lock().unwrap();
660
661            let mut filtered: Vec<Memory> = memories
662                .iter()
663                .filter(|m| m.scope == scope)
664                .filter(|m| m.created_at <= params.as_of)
665                .filter(|m| match m.kind {
666                    MemoryKind::Episodic => params.kinds.episodic,
667                    MemoryKind::Semantic => params.kinds.semantic,
668                })
669                .filter(|m| {
670                    let latest = events
671                        .iter()
672                        .filter(|e| e.loser_pid == m.pid && e.decided_at <= params.as_of)
673                        .max_by_key(|e| e.decided_at);
674                    match latest {
675                        None => true,
676                        Some(e) => e.winner_pid.is_none(),
677                    }
678                })
679                .cloned()
680                .collect();
681
682            filtered.sort_by(|a, b| b.created_at.cmp(&a.created_at));
683            filtered.truncate(params.limit);
684            Ok(filtered)
685        }
686
687        async fn forget(&self, target: ForgetTarget) -> Result<Vec<String>, StoreError> {
688            let mut memories = self.memories.lock().unwrap();
689            let mut deleted = Vec::new();
690            match target {
691                ForgetTarget::Pid(pid) => {
692                    // Mirror the Postgres `source_pid` ON DELETE CASCADE: remove
693                    // the named row and its derived semantic rows, returning all.
694                    memories.retain(|m| {
695                        if m.pid == pid || m.source_pid.as_deref() == Some(pid.as_str()) {
696                            deleted.push(m.pid.clone());
697                            false
698                        } else {
699                            true
700                        }
701                    });
702                }
703                ForgetTarget::Scope(scope) => {
704                    memories.retain(|m| {
705                        if m.scope == scope {
706                            deleted.push(m.pid.clone());
707                            false
708                        } else {
709                            true
710                        }
711                    });
712                }
713            }
714            Ok(deleted)
715        }
716
717        async fn set_index_status(&self, _pid: &str, _status: IndexStatus) -> Result<(), StoreError> {
718            Ok(())
719        }
720
721        async fn set_category(&self, pid: &str, category: &str) -> Result<(), StoreError> {
722            let mut memories = self.memories.lock().unwrap();
723            let memory = memories
724                .iter_mut()
725                .find(|m| m.pid == pid)
726                .ok_or_else(|| StoreError::NotFound(pid.to_string()))?;
727            memory.category = Some(category.to_string());
728            Ok(())
729        }
730
731        async fn retire(&self, pid: &str, reason: crate::memory::RetirementReason) -> Result<(), StoreError> {
732            let mut memories = self.memories.lock().unwrap();
733            let memory = memories
734                .iter_mut()
735                .find(|m| m.pid == pid)
736                .ok_or_else(|| StoreError::NotFound(pid.to_string()))?;
737            memory.retirement = Some(reason);
738            Ok(())
739        }
740
741        async fn find_failed(&self, _limit: usize) -> Result<Vec<Memory>, StoreError> {
742            Ok(Vec::new())
743        }
744
745        async fn list_scopes(&self) -> Result<Vec<Scope>, StoreError> {
746            let scopes: std::collections::HashSet<Scope> =
747                self.memories.lock().unwrap().iter().map(|m| m.scope.clone()).collect();
748            Ok(scopes.into_iter().collect())
749        }
750
751        async fn list_agent_ids(&self, org_id: &str, user_id: &str) -> Result<Vec<String>, StoreError> {
752            let agent_ids: std::collections::BTreeSet<String> = self
753                .memories
754                .lock()
755                .unwrap()
756                .iter()
757                .filter(|m| m.scope.org_id == org_id && m.scope.user_id == user_id)
758                .map(|m| m.scope.agent_id.clone())
759                .collect();
760            Ok(agent_ids.into_iter().collect())
761        }
762
763        async fn indexed_pids_in_scope(&self, scope: &Scope) -> Result<Vec<String>, StoreError> {
764            Ok(self
765                .memories
766                .lock()
767                .unwrap()
768                .iter()
769                .filter(|m| &m.scope == scope)
770                .map(|m| m.pid.clone())
771                .collect())
772        }
773
774        async fn edit(&self, pid: &str, patch: EditPatch) -> Result<Memory, StoreError> {
775            let mut memories = self.memories.lock().unwrap();
776            let memory = memories
777                .iter_mut()
778                .find(|m| m.pid == pid)
779                .ok_or_else(|| StoreError::NotFound(pid.to_string()))?;
780            if memory.kind != MemoryKind::Episodic {
781                return Err(StoreError::UnsupportedEdit {
782                    pid: pid.to_string(),
783                    kind: memory.kind,
784                });
785            }
786            if let Some(content) = patch.content {
787                memory.content = content;
788            }
789            if let Some(metadata) = patch.metadata {
790                memory.metadata = metadata;
791            }
792            if let Some(event_at) = patch.event_at {
793                memory.event_at = event_at;
794            }
795            memory.updated_at = Utc::now().into();
796            Ok(memory.clone())
797        }
798
799        async fn supersede(&self, pid: &str, by_pid: &str) -> Result<(), StoreError> {
800            // EXISTS-guarded behavior mirrored from Postgres: if the loser
801            // pid doesn't exist, return NotFound without writing anything.
802            {
803                let memories = self.memories.lock().unwrap();
804                if !memories.iter().any(|m| m.pid == pid) {
805                    return Err(StoreError::NotFound(pid.to_string()));
806                }
807            }
808            self.events.lock().unwrap().push(StubEvent {
809                loser_pid: pid.to_string(),
810                winner_pid: Some(by_pid.to_string()),
811                decided_at: Utc::now().into(),
812            });
813            self.refresh_cache(pid);
814            Ok(())
815        }
816
817        async fn unsupersede(&self, pid: &str) -> Result<(), StoreError> {
818            {
819                let memories = self.memories.lock().unwrap();
820                if !memories.iter().any(|m| m.pid == pid) {
821                    return Err(StoreError::NotFound(pid.to_string()));
822                }
823            }
824            // Per DP2: always insert, even when already active.
825            self.events.lock().unwrap().push(StubEvent {
826                loser_pid: pid.to_string(),
827                winner_pid: None,
828                decided_at: Utc::now().into(),
829            });
830            self.refresh_cache(pid);
831            Ok(())
832        }
833
834        async fn supersession_at(&self, pid: &str, as_of: DateTime<FixedOffset>) -> Result<Option<String>, StoreError> {
835            let events = self.events.lock().unwrap();
836            let latest = events
837                .iter()
838                .filter(|e| e.loser_pid == pid && e.decided_at <= as_of)
839                .max_by_key(|e| e.decided_at);
840            Ok(latest.and_then(|e| e.winner_pid.clone()))
841        }
842
843        async fn supersession_history(&self, pid: &str) -> Result<Vec<SupersessionEvent>, StoreError> {
844            let events = self.events.lock().unwrap();
845            let mut trail: Vec<SupersessionEvent> = events
846                .iter()
847                .filter(|e| e.loser_pid == pid)
848                .map(|e| SupersessionEvent {
849                    winner_pid: e.winner_pid.clone(),
850                    decided_at: e.decided_at,
851                })
852                .collect();
853            trail.sort_by_key(|e| e.decided_at);
854            Ok(trail)
855        }
856    }
857
858    #[tokio::test(flavor = "current_thread")]
859    async fn should_implement_trait_with_in_test_stub() {
860        let store = StubStore::default();
861        let scope = Scope {
862            agent_id: "a".to_string(),
863            org_id: "o".to_string(),
864            user_id: "u".to_string(),
865        };
866
867        let memory = store
868            .remember(NewMemory {
869                scope: scope.clone(),
870                content: "content".to_string(),
871                metadata: serde_json::json!({}),
872                kind: MemoryKind::Episodic,
873                source_pid: None,
874                event_at: None,
875                confidence: crate::memory::Confidence::MAX,
876            })
877            .await
878            .unwrap();
879        assert_eq!(memory.content, "content");
880
881        let recalled = store.recall(&memory.pid).await.unwrap();
882        assert_eq!(recalled.pid, memory.pid);
883
884        let deleted = store.forget(ForgetTarget::Pid(memory.pid.clone())).await.unwrap();
885        assert_eq!(deleted, vec![memory.pid.clone()]);
886
887        let not_found = store.recall(&memory.pid).await;
888        assert!(matches!(not_found, Err(StoreError::NotFound(_))));
889    }
890
891    #[tokio::test]
892    async fn should_list_distinct_sorted_agent_ids_for_matching_org_and_user() {
893        let store = StubStore::default();
894        let remember = async |agent: &str, org: &str, user: &str| {
895            store
896                .remember(NewMemory {
897                    scope: Scope {
898                        agent_id: agent.to_string(),
899                        org_id: org.to_string(),
900                        user_id: user.to_string(),
901                    },
902                    content: "c".to_string(),
903                    metadata: serde_json::json!({}),
904                    kind: MemoryKind::Episodic,
905                    source_pid: None,
906                    event_at: None,
907                    confidence: crate::memory::Confidence::MAX,
908                })
909                .await
910                .unwrap();
911        };
912
913        remember("zeta", "o", "u").await;
914        remember("alpha", "o", "u").await;
915        remember("alpha", "o", "u").await; // duplicate agent — should collapse
916        remember("other-org", "o2", "u").await; // wrong org — excluded
917        remember("other-user", "o", "u2").await; // wrong user — excluded
918
919        let agents = store.list_agent_ids("o", "u").await.unwrap();
920
921        assert_eq!(agents, vec!["alpha".to_string(), "zeta".to_string()]);
922    }
923
924    #[tokio::test]
925    async fn should_return_empty_agent_ids_when_scope_has_no_memories() {
926        let store = StubStore::default();
927        let agents = store.list_agent_ids("empty-org", "empty-user").await.unwrap();
928        assert!(agents.is_empty());
929    }
930
931    /// Writes a semantic row tagged with `provider`/`model` and returns its pid.
932    async fn write_semantic(store: &StubStore, scope: Scope, provider: &str, model: &str) -> String {
933        store
934            .remember(NewMemory {
935                scope,
936                content: "a derived fact".to_string(),
937                metadata: serde_json::json!({ "provider": provider, "model": model }),
938                kind: MemoryKind::Semantic,
939                source_pid: Some("src".to_string()),
940                event_at: None,
941                confidence: crate::memory::Confidence::new(80),
942            })
943            .await
944            .unwrap()
945            .pid
946    }
947
948    #[tokio::test]
949    async fn should_count_only_rejected_rows_in_extraction_numerator() {
950        let store = StubStore::default();
951        let scope = Scope {
952            agent_id: "a".to_string(),
953            org_id: "o".to_string(),
954            user_id: "u".to_string(),
955        };
956
957        // Four extractions from one model: one rejected, one stale, one
958        // superseded, one untouched. Only the rejected one is a model error.
959        let rejected = write_semantic(&store, scope.clone(), "ollama", "qwen3:14b").await;
960        let stale = write_semantic(&store, scope.clone(), "ollama", "qwen3:14b").await;
961        let superseded = write_semantic(&store, scope.clone(), "ollama", "qwen3:14b").await;
962        let winner = write_semantic(&store, scope.clone(), "ollama", "qwen3:14b").await;
963
964        store.retire(&rejected, crate::memory::RetirementReason::Rejected).await.unwrap();
965        store.retire(&stale, crate::memory::RetirementReason::Stale).await.unwrap();
966        store.supersede(&superseded, &winner).await.unwrap();
967
968        let stats = store.extraction_stats(StatsFilter::default()).await.unwrap();
969
970        assert_eq!(stats.len(), 1, "one (provider, model) pair");
971        let stat = &stats[0];
972        assert_eq!(stat.total, 4, "every semantic row counts in the denominator, regardless of retirement");
973        assert_eq!(
974            stat.rejected, 1,
975            "only Rejected counts; Stale (source changed) and Superseded (newer fact won) are not model errors",
976        );
977    }
978
979    #[tokio::test]
980    async fn should_break_extraction_stats_down_per_provider_and_model() {
981        let store = StubStore::default();
982        let scope = Scope {
983            agent_id: "a".to_string(),
984            org_id: "o".to_string(),
985            user_id: "u".to_string(),
986        };
987
988        let weak = write_semantic(&store, scope.clone(), "ollama", "llama3.2:1b").await;
989        write_semantic(&store, scope.clone(), "ollama", "llama3.2:1b").await;
990        write_semantic(&store, scope.clone(), "ollama", "qwen3:14b").await;
991        store.retire(&weak, crate::memory::RetirementReason::Rejected).await.unwrap();
992
993        let stats = store.extraction_stats(StatsFilter::default()).await.unwrap();
994
995        // Ordered by (provider, model) ascending: llama before qwen.
996        assert_eq!(stats.len(), 2, "one row per distinct (provider, model)");
997        assert_eq!(stats[0].model, "llama3.2:1b");
998        assert_eq!((stats[0].total, stats[0].rejected), (2, 1));
999        assert_eq!(stats[1].model, "qwen3:14b");
1000        assert_eq!((stats[1].total, stats[1].rejected), (1, 0), "rejecting one model must not touch the other");
1001    }
1002
1003    #[tokio::test]
1004    async fn should_scope_extraction_stats_to_the_filtered_subset() {
1005        let store = StubStore::default();
1006        let mine = Scope {
1007            agent_id: "a".to_string(),
1008            org_id: "acme".to_string(),
1009            user_id: "u".to_string(),
1010        };
1011        let theirs = Scope {
1012            agent_id: "a".to_string(),
1013            org_id: "other".to_string(),
1014            user_id: "u".to_string(),
1015        };
1016
1017        write_semantic(&store, mine.clone(), "ollama", "m").await;
1018        write_semantic(&store, theirs.clone(), "ollama", "m").await;
1019
1020        let filter = StatsFilter {
1021            org_id: Some("acme".to_string()),
1022            ..StatsFilter::default()
1023        };
1024        let stats = store.extraction_stats(filter).await.unwrap();
1025
1026        assert_eq!(stats.len(), 1);
1027        assert_eq!(stats[0].total, 1, "the org filter must exclude another org's extractions");
1028    }
1029
1030    #[test]
1031    fn should_render_index_status_as_lowercase_string() {
1032        assert_eq!(IndexStatus::Pending.as_ref(), "pending");
1033        assert_eq!(IndexStatus::Indexed.as_ref(), "indexed");
1034        assert_eq!(IndexStatus::Failed.as_ref(), "failed");
1035    }
1036
1037    async fn write(store: &StubStore, content: &str) -> Memory {
1038        let scope = Scope {
1039            agent_id: "a".to_string(),
1040            org_id: "o".to_string(),
1041            user_id: "u".to_string(),
1042        };
1043        store
1044            .remember(NewMemory {
1045                scope,
1046                content: content.to_string(),
1047                metadata: serde_json::json!({}),
1048                kind: MemoryKind::Semantic,
1049                source_pid: None,
1050                event_at: None,
1051                confidence: crate::memory::Confidence::MAX,
1052            })
1053            .await
1054            .unwrap()
1055    }
1056
1057    #[tokio::test(flavor = "current_thread")]
1058    async fn should_set_superseded_by_when_supersede_called() {
1059        let store = StubStore::default();
1060        let loser = write(&store, "old fact").await;
1061        let winner = write(&store, "new fact").await;
1062
1063        store.supersede(&loser.pid, &winner.pid).await.unwrap();
1064
1065        let after = store.recall(&loser.pid).await.unwrap();
1066        let supersession = after.supersession.as_ref().expect("supersession set");
1067        assert_eq!(supersession.winner_pid, winner.pid);
1068    }
1069
1070    #[tokio::test(flavor = "current_thread")]
1071    async fn should_clear_superseded_by_when_unsupersede_called() {
1072        let store = StubStore::default();
1073        let loser = write(&store, "old fact").await;
1074        let winner = write(&store, "new fact").await;
1075        store.supersede(&loser.pid, &winner.pid).await.unwrap();
1076
1077        store.unsupersede(&loser.pid).await.unwrap();
1078
1079        let after = store.recall(&loser.pid).await.unwrap();
1080        assert_eq!(after.supersession, None);
1081    }
1082
1083    #[tokio::test(flavor = "current_thread")]
1084    async fn should_return_not_found_when_supersede_targets_missing_pid() {
1085        let store = StubStore::default();
1086        let winner = write(&store, "fact").await;
1087
1088        let result = store.supersede("does-not-exist", &winner.pid).await;
1089
1090        assert!(matches!(result, Err(StoreError::NotFound(_))));
1091    }
1092
1093    #[tokio::test(flavor = "current_thread")]
1094    async fn should_return_not_found_when_unsupersede_targets_missing_pid() {
1095        let store = StubStore::default();
1096
1097        let result = store.unsupersede("does-not-exist").await;
1098
1099        assert!(matches!(result, Err(StoreError::NotFound(_))));
1100    }
1101
1102    #[tokio::test(flavor = "current_thread")]
1103    async fn should_resolve_to_latest_winner_when_resuperseded() {
1104        let store = StubStore::default();
1105        let loser = write(&store, "old").await;
1106        let first_winner = write(&store, "first").await;
1107        let second_winner = write(&store, "second").await;
1108
1109        store.supersede(&loser.pid, &first_winner.pid).await.unwrap();
1110        store.supersede(&loser.pid, &second_winner.pid).await.unwrap();
1111
1112        let after = store.recall(&loser.pid).await.unwrap();
1113        let supersession = after.supersession.as_ref().expect("supersession set");
1114        assert_eq!(
1115            supersession.winner_pid, second_winner.pid,
1116            "latest event wins the cache"
1117        );
1118    }
1119
1120    #[tokio::test(flavor = "current_thread")]
1121    async fn should_return_winner_pid_from_supersession_at_for_past_timestamp() {
1122        let store = StubStore::default();
1123        let loser = write(&store, "loser").await;
1124        let winner = write(&store, "winner").await;
1125        store.supersede(&loser.pid, &winner.pid).await.unwrap();
1126        let now: DateTime<FixedOffset> = Utc::now().into();
1127
1128        let result = store.supersession_at(&loser.pid, now).await.unwrap();
1129
1130        assert_eq!(result.as_deref(), Some(winner.pid.as_str()));
1131    }
1132
1133    #[tokio::test(flavor = "current_thread")]
1134    async fn should_return_none_from_supersession_at_when_as_of_predates_event() {
1135        let store = StubStore::default();
1136        let loser = write(&store, "loser").await;
1137        let winner = write(&store, "winner").await;
1138        let before: DateTime<FixedOffset> = Utc::now().into();
1139        // Sleep just enough that the event's decided_at is strictly after `before`.
1140        // current_thread runtime + this short sleep is reliable in CI.
1141        tokio::time::sleep(std::time::Duration::from_millis(5)).await;
1142        store.supersede(&loser.pid, &winner.pid).await.unwrap();
1143
1144        let result = store.supersession_at(&loser.pid, before).await.unwrap();
1145
1146        assert!(result.is_none(), "events after as_of must not count");
1147    }
1148
1149    #[tokio::test(flavor = "current_thread")]
1150    async fn should_return_none_from_supersession_at_when_latest_event_was_unsupersede() {
1151        let store = StubStore::default();
1152        let loser = write(&store, "loser").await;
1153        let winner = write(&store, "winner").await;
1154        store.supersede(&loser.pid, &winner.pid).await.unwrap();
1155        store.unsupersede(&loser.pid).await.unwrap();
1156        let now: DateTime<FixedOffset> = Utc::now().into();
1157
1158        let result = store.supersession_at(&loser.pid, now).await.unwrap();
1159
1160        assert!(result.is_none(), "unsupersede event clears the as-of answer");
1161    }
1162
1163    #[tokio::test(flavor = "current_thread")]
1164    async fn should_return_empty_supersession_history_when_pid_has_no_events() {
1165        let store = StubStore::default();
1166        let solo = write(&store, "never superseded").await;
1167
1168        let trail = store.supersession_history(&solo.pid).await.unwrap();
1169
1170        assert!(trail.is_empty(), "no events = empty trail, not NotFound");
1171    }
1172
1173    #[tokio::test(flavor = "current_thread")]
1174    async fn should_return_supersession_history_in_ascending_order() {
1175        let store = StubStore::default();
1176        let loser = write(&store, "old").await;
1177        let first = write(&store, "first").await;
1178        let second = write(&store, "second").await;
1179        store.supersede(&loser.pid, &first.pid).await.unwrap();
1180        tokio::time::sleep(std::time::Duration::from_millis(5)).await;
1181        store.supersede(&loser.pid, &second.pid).await.unwrap();
1182
1183        let trail = store.supersession_history(&loser.pid).await.unwrap();
1184
1185        assert_eq!(trail.len(), 2, "both events present");
1186        assert_eq!(trail[0].winner_pid.as_deref(), Some(first.pid.as_str()));
1187        assert_eq!(trail[1].winner_pid.as_deref(), Some(second.pid.as_str()));
1188        assert!(trail[0].decided_at <= trail[1].decided_at, "ascending by decided_at");
1189    }
1190
1191    #[tokio::test(flavor = "current_thread")]
1192    async fn should_include_unsupersede_events_in_supersession_history() {
1193        let store = StubStore::default();
1194        let loser = write(&store, "old").await;
1195        let winner = write(&store, "winner").await;
1196        store.supersede(&loser.pid, &winner.pid).await.unwrap();
1197        tokio::time::sleep(std::time::Duration::from_millis(5)).await;
1198        store.unsupersede(&loser.pid).await.unwrap();
1199
1200        let trail = store.supersession_history(&loser.pid).await.unwrap();
1201
1202        assert_eq!(trail.len(), 2);
1203        assert_eq!(
1204            trail[0].winner_pid.as_deref(),
1205            Some(winner.pid.as_str()),
1206            "supersede first"
1207        );
1208        assert!(
1209            trail[1].winner_pid.is_none(),
1210            "unsupersede represented as winner_pid=None"
1211        );
1212    }
1213
1214    #[tokio::test(flavor = "current_thread")]
1215    async fn should_reject_edit_when_kind_is_not_episodic() {
1216        let store = StubStore::default();
1217        let semantic = write(&store, "derived fact").await;
1218
1219        let result = store
1220            .edit(
1221                &semantic.pid,
1222                EditPatch {
1223                    content: Some("hand edit".to_string()),
1224                    ..EditPatch::default()
1225                },
1226            )
1227            .await;
1228
1229        match result {
1230            Err(StoreError::UnsupportedEdit { pid, kind }) => {
1231                assert_eq!(pid, semantic.pid);
1232                assert_eq!(kind, MemoryKind::Semantic);
1233            }
1234            other => panic!("expected UnsupportedEdit for semantic kind; got {other:?}"),
1235        }
1236    }
1237
1238    #[tokio::test(flavor = "current_thread")]
1239    async fn should_return_not_found_when_editing_missing_pid() {
1240        let store = StubStore::default();
1241
1242        let result = store
1243            .edit(
1244                "no-such-pid",
1245                EditPatch {
1246                    content: Some("anything".to_string()),
1247                    ..EditPatch::default()
1248                },
1249            )
1250            .await;
1251
1252        match result {
1253            Err(StoreError::NotFound(pid)) => assert_eq!(pid, "no-such-pid"),
1254            other => panic!("expected NotFound for missing pid; got {other:?}"),
1255        }
1256    }
1257}