Skip to main content

bamboo_engine/
session_repository.rs

1//! Canonical session coordinator owned by the framework.
2//!
3//! [`SessionRepository`] bundles the three tiers a Bamboo session lives in — the
4//! in-memory [`SessionCache`], the durable [`Storage`], and the
5//! merge-on-write [`LockedSessionStore`] — and provides the one canonical
6//! load/save coordination (cache → storage → backfill, and dual-write).
7//!
8//! This is a *framework* capability, not a server one: previously the
9//! coordination lived only as inherent methods on the server's `AppState`,
10//! which meant anything outside the HTTP server (the SDK, in-process embedders)
11//! could not load or persist sessions consistently. `SessionRepository` lets any
12//! caller that holds the three tiers share the exact same behaviour; the
13//! server's `AppState` now delegates to it.
14
15use std::sync::Arc;
16
17use bamboo_agent_core::storage::Storage;
18use bamboo_agent_core::Session;
19use bamboo_storage::LockedSessionStore;
20
21use crate::{read_cached_session, SessionCache};
22
23/// Framework-owned coordinator over a session's cache / storage / persistence
24/// tiers. Cheap to clone (all fields are `Arc`).
25#[derive(Clone)]
26pub struct SessionRepository {
27    cache: SessionCache,
28    storage: Arc<dyn Storage>,
29    persistence: Arc<LockedSessionStore>,
30}
31
32impl SessionRepository {
33    pub fn new(
34        cache: SessionCache,
35        storage: Arc<dyn Storage>,
36        persistence: Arc<LockedSessionStore>,
37    ) -> Self {
38        Self {
39            cache,
40            storage,
41            persistence,
42        }
43    }
44
45    pub fn cache(&self) -> &SessionCache {
46        &self.cache
47    }
48
49    pub fn storage(&self) -> &Arc<dyn Storage> {
50        &self.storage
51    }
52
53    pub fn persistence(&self) -> &Arc<LockedSessionStore> {
54        &self.persistence
55    }
56
57    /// Load a session from the memory cache, falling back to durable storage
58    /// (and back-filling the cache on a storage hit). `None` if absent in both.
59    pub async fn load(&self, session_id: &str) -> Option<Session> {
60        if let Some(session) = read_cached_session(&self.cache, session_id) {
61            return Some(session);
62        }
63
64        match self.storage.load_session(session_id).await {
65            Ok(Some(session)) => {
66                self.cache.insert(
67                    session_id.to_string(),
68                    Arc::new(parking_lot::RwLock::new(session.clone())),
69                );
70                Some(session)
71            }
72            _ => None,
73        }
74    }
75
76    /// Like [`load`](Self::load), but surfaces storage errors instead of
77    /// swallowing them to `None`. Cache hit short-circuits; a storage hit
78    /// back-fills the cache.
79    pub async fn try_load(&self, session_id: &str) -> std::io::Result<Option<Session>> {
80        if let Some(session) = read_cached_session(&self.cache, session_id) {
81            return Ok(Some(session));
82        }
83        let loaded = self.storage.load_session(session_id).await?;
84        if let Some(ref session) = loaded {
85            self.cache.insert(
86                session_id.to_string(),
87                Arc::new(parking_lot::RwLock::new(session.clone())),
88            );
89        }
90        Ok(loaded)
91    }
92
93    /// Persist the session (merge-on-write) and refresh the cache, surfacing
94    /// storage errors. Use [`save_and_cache`](Self::save_and_cache) for the
95    /// fire-and-forget variant that logs and continues on failure.
96    pub async fn save(&self, session: &mut Session) -> std::io::Result<()> {
97        self.persistence.merge_save_runtime(session).await?;
98        self.cache.insert(
99            session.id.clone(),
100            Arc::new(parking_lot::RwLock::new(session.clone())),
101        );
102        Ok(())
103    }
104
105    /// Load a session, creating a fresh `Session::new(id, model)` if absent.
106    pub async fn load_or_create(&self, session_id: &str, model: &str) -> Session {
107        if let Some(session) = self.load(session_id).await {
108            return session;
109        }
110        Session::new(session_id.to_string(), model.to_string())
111    }
112
113    /// Load a session, reconciling the memory and storage copies via a
114    /// preference heuristic: storage wins when it is strictly newer, or when it
115    /// is the same age but still carries a pending question memory lost. Storage
116    /// is **never** preferred when it is strictly older than memory.
117    ///
118    /// The cache is refreshed cache-aside but with a no-regression guarantee:
119    /// `load_merged` never overwrites a newer cached session with an older
120    /// storage copy, so it is safe to call from hot read paths.
121    pub async fn load_merged(&self, session_id: &str) -> Option<Session> {
122        let memory_session = read_cached_session(&self.cache, session_id);
123        let storage_session = self
124            .storage
125            .load_session(session_id)
126            .await
127            .unwrap_or_default();
128
129        match (memory_session, storage_session) {
130            (Some(memory), Some(storage)) => {
131                let prefer_storage = should_prefer_storage(&memory, &storage);
132                let diverged = prefer_storage || memory.messages.len() != storage.messages.len();
133                let chosen_len = if prefer_storage {
134                    storage.messages.len()
135                } else {
136                    memory.messages.len()
137                };
138                macro_rules! merged_log {
139                    ($level:ident) => {
140                        tracing::$level!(
141                            "[{}] load_session_merged: memory={} msgs (updated_at={}), storage={} msgs (updated_at={}), prefer_storage={} -> chose {} msgs",
142                            session_id,
143                            memory.messages.len(),
144                            memory.updated_at,
145                            storage.messages.len(),
146                            storage.updated_at,
147                            prefer_storage,
148                            chosen_len,
149                        )
150                    };
151                }
152                if diverged {
153                    merged_log!(debug);
154                } else {
155                    merged_log!(trace);
156                }
157                let memory_updated_at = memory.updated_at;
158                let chosen = if prefer_storage { storage } else { memory };
159                // Cache-aside refresh with a hard no-regression invariant: only
160                // write back when we actually reconciled *to storage* (a memory
161                // win is already the cached copy; re-inserting it would needlessly
162                // replace a possibly-live Arc) AND the reconciled copy is not
163                // older than what memory already holds. This is what makes
164                // `load_merged` safe on hot read paths — it can never clobber a
165                // freshly-updated session with a stale storage copy.
166                if prefer_storage && chosen.updated_at >= memory_updated_at {
167                    self.cache.insert(
168                        session_id.to_string(),
169                        Arc::new(parking_lot::RwLock::new(chosen.clone())),
170                    );
171                }
172                Some(chosen)
173            }
174            (Some(memory), None) => Some(memory),
175            (None, Some(storage)) => {
176                self.cache.insert(
177                    session_id.to_string(),
178                    Arc::new(parking_lot::RwLock::new(storage.clone())),
179                );
180                Some(storage)
181            }
182            (None, None) => None,
183        }
184    }
185
186    /// Persist the session (merge-on-write, preserving concurrent UI edits to
187    /// the authoritative metadata group) and refresh the in-memory cache.
188    pub async fn save_and_cache(&self, session: &mut Session) {
189        if let Err(error) = self.persistence.merge_save_runtime(session).await {
190            tracing::warn!("[{}] Failed to save session: {}", session.id, error);
191        }
192        self.cache.insert(
193            session.id.clone(),
194            Arc::new(parking_lot::RwLock::new(session.clone())),
195        );
196    }
197}
198
199fn should_prefer_storage(memory_session: &Session, storage_session: &Session) -> bool {
200    // Never reconcile *backwards* to a strictly-older storage copy: if memory is
201    // newer it is authoritative (e.g. it just answered and cleared a pending
202    // question while storage still holds the stale one). Respecting `updated_at`
203    // here is what stops `load_merged` from returning — and caching — stale data.
204    if storage_session.updated_at < memory_session.updated_at {
205        return false;
206    }
207    // Storage is same-age or newer: prefer it when strictly newer, or when it
208    // still carries a pending question that the (same-age) memory copy lost, so
209    // a genuine clarification is never dropped.
210    storage_session.updated_at > memory_session.updated_at
211        || (memory_session.pending_question.is_none() && storage_session.pending_question.is_some())
212}
213
214/// `SessionRepository` is the canonical `RuntimeSessionPersistence`: the runtime
215/// can persist a session through the same coordinator (merge-on-write + cache
216/// refresh) instead of a bespoke adapter.
217#[async_trait::async_trait]
218impl bamboo_domain::RuntimeSessionPersistence for SessionRepository {
219    async fn save_runtime_session(&self, session: &mut Session) -> std::io::Result<()> {
220        self.save(session).await
221    }
222
223    async fn append_token_usage_record(
224        &self,
225        session_id: &str,
226        json_line: &str,
227    ) -> std::io::Result<()> {
228        self.storage
229            .append_token_usage_record(session_id, json_line)
230            .await
231    }
232}
233
234#[cfg(test)]
235mod tests {
236    use super::*;
237    use bamboo_agent_core::storage::Storage;
238    use chrono::Utc;
239    use std::collections::HashMap;
240    use std::sync::Mutex;
241
242    #[derive(Default)]
243    struct MapStorage {
244        sessions: Mutex<HashMap<String, Session>>,
245    }
246
247    #[async_trait::async_trait]
248    impl Storage for MapStorage {
249        async fn save_session(&self, session: &Session) -> std::io::Result<()> {
250            self.sessions
251                .lock()
252                .unwrap()
253                .insert(session.id.clone(), session.clone());
254            Ok(())
255        }
256        async fn load_session(&self, session_id: &str) -> std::io::Result<Option<Session>> {
257            Ok(self.sessions.lock().unwrap().get(session_id).cloned())
258        }
259        async fn delete_session(&self, session_id: &str) -> std::io::Result<bool> {
260            Ok(self.sessions.lock().unwrap().remove(session_id).is_some())
261        }
262    }
263
264    fn test_repo(storage: Arc<dyn Storage>) -> SessionRepository {
265        let cache: SessionCache = Arc::new(dashmap::DashMap::new());
266        let persistence = Arc::new(LockedSessionStore::new(storage.clone()));
267        SessionRepository::new(cache, storage, persistence)
268    }
269
270    fn cache_put(repo: &SessionRepository, session: &Session) {
271        repo.cache().insert(
272            session.id.clone(),
273            Arc::new(parking_lot::RwLock::new(session.clone())),
274        );
275    }
276
277    /// Regression guard: a strictly-newer in-memory session (e.g. one that just
278    /// answered and cleared its pending question) must win over a strictly-older
279    /// storage copy that still carries the pending question — both in the value
280    /// returned AND in the cache (no clobber).
281    #[tokio::test]
282    async fn load_merged_does_not_regress_to_older_storage() {
283        let storage: Arc<dyn Storage> = Arc::new(MapStorage::default());
284        let repo = test_repo(storage.clone());
285        let id = "s1";
286
287        let mut stale = Session::new(id.to_string(), "m");
288        stale.set_pending_question(
289            "tc1".into(),
290            "kind".into(),
291            "q?".into(),
292            vec!["OK".into()],
293            true,
294        );
295        stale.updated_at = Utc::now() - chrono::Duration::seconds(10);
296        storage.save_session(&stale).await.unwrap();
297
298        let mut fresh = Session::new(id.to_string(), "m");
299        fresh.updated_at = Utc::now();
300        cache_put(&repo, &fresh);
301
302        let merged = repo.load_merged(id).await.expect("session exists");
303        assert!(
304            merged.pending_question.is_none(),
305            "must return the newer answered memory copy, not the stale storage one"
306        );
307        let cached = read_cached_session(repo.cache(), id).expect("cached");
308        assert!(
309            cached.pending_question.is_none(),
310            "load_merged must never regress the cache to a stale storage copy"
311        );
312    }
313
314    /// The pending-question recovery still works when storage is the same age:
315    /// if memory lost a pending question that same-age storage retains, prefer
316    /// storage so a genuine clarification is not dropped.
317    #[tokio::test]
318    async fn load_merged_recovers_pending_question_from_same_age_storage() {
319        let storage: Arc<dyn Storage> = Arc::new(MapStorage::default());
320        let repo = test_repo(storage.clone());
321        let id = "s2";
322        let ts = Utc::now();
323
324        let mut with_pending = Session::new(id.to_string(), "m");
325        with_pending.set_pending_question(
326            "tc".into(),
327            "k".into(),
328            "q".into(),
329            vec!["OK".into()],
330            true,
331        );
332        with_pending.updated_at = ts;
333        storage.save_session(&with_pending).await.unwrap();
334
335        let mut lost = with_pending.clone();
336        lost.clear_pending_question();
337        lost.updated_at = ts;
338        cache_put(&repo, &lost);
339
340        let merged = repo.load_merged(id).await.expect("session exists");
341        assert!(
342            merged.pending_question.is_some(),
343            "same-age storage carrying a pending question must still be recovered"
344        );
345    }
346}