Skip to main content

potato_agent/agents/store/
persistent_memory.rs

1use super::{memory_store::StoredMemoryTurn, MemoryStore, StoreError};
2use crate::agents::memory::{Memory, MemoryTurn};
3use potato_type::prompt::MessageNum;
4use potato_util::create_uuid7;
5use std::fmt::Debug;
6use std::sync::Arc;
7use tracing::warn;
8
9/// Write-through memory that persists turns to a `MemoryStore` and caches them in-process.
10pub struct PersistentMemory {
11    session_id: String,
12    app_name: String,
13    user_id: String,
14    invocation_id: String,
15    store: Arc<dyn MemoryStore>,
16    cache: Vec<MemoryTurn>,
17    /// True once the cache has been hydrated from the backing store.
18    loaded: bool,
19    /// If `Some(n)`, only the last `n` turns are kept in the cache.
20    max_turns: Option<usize>,
21}
22
23impl Debug for PersistentMemory {
24    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
25        f.debug_struct("PersistentMemory")
26            .field("session_id", &self.session_id)
27            .field("app_name", &self.app_name)
28            .field("user_id", &self.user_id)
29            .field("loaded", &self.loaded)
30            .field("cached_turns", &self.cache.len())
31            .field("max_turns", &self.max_turns)
32            .finish()
33    }
34}
35
36impl PersistentMemory {
37    /// Unbounded persistent memory.
38    pub fn new(
39        session_id: impl Into<String>,
40        app_name: impl Into<String>,
41        user_id: impl Into<String>,
42        store: Arc<dyn MemoryStore>,
43    ) -> Self {
44        Self {
45            session_id: session_id.into(),
46            app_name: app_name.into(),
47            user_id: user_id.into(),
48            invocation_id: create_uuid7(),
49            store,
50            cache: Vec::new(),
51            loaded: false,
52            max_turns: None,
53        }
54    }
55
56    /// Windowed persistent memory — only the last `n` turns are kept in the in-process cache.
57    /// Older turns are still in the backing store but won't be injected into prompts.
58    pub fn windowed(
59        session_id: impl Into<String>,
60        app_name: impl Into<String>,
61        user_id: impl Into<String>,
62        store: Arc<dyn MemoryStore>,
63        n: usize,
64    ) -> Self {
65        Self {
66            session_id: session_id.into(),
67            app_name: app_name.into(),
68            user_id: user_id.into(),
69            invocation_id: create_uuid7(),
70            store,
71            cache: Vec::new(),
72            loaded: false,
73            max_turns: Some(n),
74        }
75    }
76
77    /// Load all turns from the backing store into the in-process cache.
78    /// Idempotent — subsequent calls are no-ops once hydrated.
79    pub async fn hydrate(&mut self) -> Result<(), StoreError> {
80        if self.loaded {
81            return Ok(());
82        }
83        let stored = self
84            .store
85            .load_turns(&self.app_name, &self.user_id, &self.session_id)
86            .await?;
87        let turns: Vec<MemoryTurn> = stored.into_iter().map(|t| t.into_memory_turn()).collect();
88        self.cache = if let Some(n) = self.max_turns {
89            turns.into_iter().rev().take(n).rev().collect()
90        } else {
91            turns
92        };
93        self.loaded = true;
94        Ok(())
95    }
96
97    /// Append a turn to the cache and persist it to the backing store.
98    pub async fn push_turn_async(&mut self, turn: MemoryTurn) -> Result<(), StoreError> {
99        let stored = StoredMemoryTurn::new(
100            &self.session_id,
101            &self.app_name,
102            &self.user_id,
103            &self.invocation_id,
104            turn.user.clone(),
105            turn.assistant.clone(),
106        );
107        self.store.save_turn(&stored).await?;
108
109        self.cache.push(turn);
110        if let Some(n) = self.max_turns {
111            if self.cache.len() > n {
112                self.cache.remove(0);
113            }
114        }
115        Ok(())
116    }
117
118    /// Clear the in-process cache and delete all turns from the backing store.
119    pub async fn clear_store(&mut self) -> Result<(), StoreError> {
120        self.store
121            .clear(&self.app_name, &self.user_id, &self.session_id)
122            .await?;
123        self.cache.clear();
124        Ok(())
125    }
126}
127
128impl Memory for PersistentMemory {
129    fn as_any_mut(&mut self) -> Option<&mut dyn std::any::Any> {
130        Some(self)
131    }
132
133    /// Synchronous push — appends to the in-process cache only.
134    /// **Note**: this does not persist to the backing store. Use `push_turn_async` in async
135    /// contexts to ensure write-through persistence.
136    fn push_turn(&mut self, turn: MemoryTurn) {
137        warn!("PersistentMemory::push_turn called synchronously — turn will not be persisted to the backing store. Use push_turn_async in async contexts.");
138        self.cache.push(turn);
139        if let Some(n) = self.max_turns {
140            if self.cache.len() > n {
141                self.cache.remove(0);
142            }
143        }
144    }
145
146    fn messages(&self) -> Vec<MessageNum> {
147        let mut msgs = Vec::with_capacity(self.cache.len() * 2);
148        for turn in &self.cache {
149            msgs.push(turn.user.clone());
150            msgs.push(turn.assistant.clone());
151        }
152        msgs
153    }
154
155    fn clear(&mut self) {
156        self.cache.clear();
157    }
158
159    fn len(&self) -> usize {
160        self.cache.len()
161    }
162}