bamboo_server/app_state/
session_loader.rs1use super::*;
14
15#[async_trait::async_trait]
16impl crate::session_app::repository::SessionAccess for AppState {
17 async fn load_session(
18 &self,
19 id: &str,
20 ) -> Result<Option<bamboo_agent_core::Session>, crate::session_app::errors::SessionLoadError>
21 {
22 match AppState::load_session(self, id).await {
23 Some(session) => Ok(Some(session)),
24 None => Err(crate::session_app::errors::SessionLoadError::NotFound(
25 id.to_string(),
26 )),
27 }
28 }
29
30 async fn load_or_create(
31 &self,
32 id: &str,
33 model: &str,
34 ) -> Result<bamboo_agent_core::Session, crate::session_app::errors::SessionLoadError> {
35 Ok(AppState::load_or_create_session(self, id, model).await)
36 }
37
38 async fn save_session(
39 &self,
40 session: &mut bamboo_agent_core::Session,
41 ) -> Result<(), crate::session_app::errors::SessionSaveError> {
42 self.persistence
43 .merge_save_runtime(session)
44 .await
45 .map_err(|e| crate::session_app::errors::SessionSaveError::StorageError(e.to_string()))
46 }
47
48 async fn save_and_cache(
49 &self,
50 session: &mut bamboo_agent_core::Session,
51 ) -> Result<(), crate::session_app::errors::SessionSaveError> {
52 AppState::save_and_cache_session(self, session).await;
53 Ok(())
54 }
55
56 async fn load_merged(
57 &self,
58 id: &str,
59 ) -> Result<Option<bamboo_agent_core::Session>, crate::session_app::errors::SessionLoadError>
60 {
61 Ok(AppState::load_session_merged(self, id).await)
62 }
63}
64
65impl AppState {
66 pub async fn load_session(&self, session_id: &str) -> Option<bamboo_agent_core::Session> {
70 let memory_session = {
71 let sessions = self.sessions.read().await;
72 sessions.get(session_id).cloned()
73 };
74
75 if let Some(session) = memory_session {
76 return Some(session);
77 }
78
79 match self.storage.load_session(session_id).await {
80 Ok(Some(session)) => {
81 let mut sessions = self.sessions.write().await;
82 sessions.insert(session_id.to_string(), session.clone());
83 Some(session)
84 }
85 _ => None,
86 }
87 }
88
89 pub async fn load_or_create_session(
93 &self,
94 session_id: &str,
95 model: &str,
96 ) -> bamboo_agent_core::Session {
97 if let Some(session) = self.load_session(session_id).await {
98 return session;
99 }
100 bamboo_agent_core::Session::new(session_id.to_string(), model.to_string())
101 }
102
103 pub async fn load_session_merged(
109 &self,
110 session_id: &str,
111 ) -> Option<bamboo_agent_core::Session> {
112 let memory_session = {
113 let sessions = self.sessions.read().await;
114 sessions.get(session_id).cloned()
115 };
116
117 let storage_session = self
118 .storage
119 .load_session(session_id)
120 .await
121 .unwrap_or_default();
122
123 match (memory_session, storage_session) {
124 (Some(memory), Some(storage)) => {
125 let prefer_storage = should_prefer_storage(&memory, &storage);
126 let diverged =
131 prefer_storage || memory.messages.len() != storage.messages.len();
132 let chosen_len = if prefer_storage {
133 storage.messages.len()
134 } else {
135 memory.messages.len()
136 };
137 macro_rules! merged_log {
138 ($level:ident) => {
139 tracing::$level!(
140 "[{}] load_session_merged: memory={} msgs (updated_at={}), storage={} msgs (updated_at={}), prefer_storage={} -> chose {} msgs",
141 session_id,
142 memory.messages.len(),
143 memory.updated_at,
144 storage.messages.len(),
145 storage.updated_at,
146 prefer_storage,
147 chosen_len,
148 )
149 };
150 }
151 if diverged {
152 merged_log!(debug);
153 } else {
154 merged_log!(trace);
155 }
156 let chosen = if prefer_storage { storage } else { memory };
157 let mut sessions = self.sessions.write().await;
158 sessions.insert(session_id.to_string(), chosen.clone());
159 Some(chosen)
160 }
161 (Some(memory), None) => Some(memory),
162 (None, Some(storage)) => {
163 let mut sessions = self.sessions.write().await;
164 sessions.insert(session_id.to_string(), storage.clone());
165 Some(storage)
166 }
167 (None, None) => None,
168 }
169 }
170
171 pub async fn save_and_cache_session(&self, session: &mut bamboo_agent_core::Session) {
177 if let Err(error) = self.persistence.merge_save_runtime(session).await {
178 tracing::warn!("[{}] Failed to save session: {}", session.id, error);
179 }
180 let mut sessions = self.sessions.write().await;
181 sessions.insert(session.id.clone(), session.clone());
182 }
183}
184
185fn should_prefer_storage(
186 memory_session: &bamboo_agent_core::Session,
187 storage_session: &bamboo_agent_core::Session,
188) -> bool {
189 if memory_session.pending_question.is_none() && storage_session.pending_question.is_some() {
190 return true;
191 }
192 storage_session.updated_at > memory_session.updated_at
193}
194
195#[cfg(test)]
196mod tests {
197 use super::*;
198
199 #[tokio::test]
200 async fn load_session_returns_from_memory_first() {
201 let temp_dir = tempfile::tempdir().expect("temp dir");
202 let state = AppState::new(temp_dir.path().to_path_buf())
203 .await
204 .expect("app state");
205
206 let session_id = "session-memory-first";
207 let session = bamboo_agent_core::Session::new(session_id.to_string(), "test-model");
208
209 {
211 let mut sessions = state.sessions.write().await;
212 sessions.insert(session_id.to_string(), session.clone());
213 }
214
215 let loaded = state.load_session(session_id).await;
216 assert!(loaded.is_some());
217 assert_eq!(loaded.unwrap().id, session_id);
218 }
219
220 #[tokio::test]
221 async fn load_session_falls_back_to_storage() {
222 let temp_dir = tempfile::tempdir().expect("temp dir");
223 let state = AppState::new(temp_dir.path().to_path_buf())
224 .await
225 .expect("app state");
226
227 let session_id = "session-storage-fallback";
228 let session = bamboo_agent_core::Session::new(session_id.to_string(), "test-model");
229
230 state
232 .storage
233 .save_session(&session)
234 .await
235 .expect("save session");
236
237 let loaded = state.load_session(session_id).await;
238 assert!(loaded.is_some());
239 assert_eq!(loaded.unwrap().id, session_id);
240 }
241
242 #[tokio::test]
243 async fn load_session_returns_none_when_missing() {
244 let temp_dir = tempfile::tempdir().expect("temp dir");
245 let state = AppState::new(temp_dir.path().to_path_buf())
246 .await
247 .expect("app state");
248
249 let loaded = state.load_session("nonexistent").await;
250 assert!(loaded.is_none());
251 }
252
253 #[tokio::test]
254 async fn load_or_create_creates_new_when_missing() {
255 let temp_dir = tempfile::tempdir().expect("temp dir");
256 let state = AppState::new(temp_dir.path().to_path_buf())
257 .await
258 .expect("app state");
259
260 let session = state.load_or_create_session("new-session", "gpt-4").await;
261 assert_eq!(session.id, "new-session");
262 assert_eq!(session.model, "gpt-4");
263 }
264
265 #[tokio::test]
266 async fn load_session_merged_prefers_storage_with_pending_question() {
267 let temp_dir = tempfile::tempdir().expect("temp dir");
268 let state = AppState::new(temp_dir.path().to_path_buf())
269 .await
270 .expect("app state");
271
272 let session_id = "session-merge-pending";
273 let memory_session = bamboo_agent_core::Session::new(session_id.to_string(), "test-model");
274 let mut storage_session = memory_session.clone();
275 storage_session.set_pending_question(
276 "tool-call-1".to_string(),
277 "ConclusionWithOptions".to_string(),
278 "Need confirmation?".to_string(),
279 vec!["OK".to_string()],
280 true,
281 );
282
283 {
284 let mut sessions = state.sessions.write().await;
285 sessions.insert(session_id.to_string(), memory_session);
286 }
287 state
288 .storage
289 .save_session(&storage_session)
290 .await
291 .expect("save session");
292
293 let loaded = state.load_session_merged(session_id).await;
294 assert!(loaded.is_some());
295 assert!(loaded.unwrap().pending_question.is_some());
296 }
297
298 #[tokio::test]
299 async fn save_and_cache_session_writes_both() {
300 let temp_dir = tempfile::tempdir().expect("temp dir");
301 let state = AppState::new(temp_dir.path().to_path_buf())
302 .await
303 .expect("app state");
304
305 let session_id = "session-save-cache";
306 let mut session = bamboo_agent_core::Session::new(session_id.to_string(), "test-model");
307 session.title = "test-title".to_string();
308
309 state.save_and_cache_session(&mut session).await;
310
311 let cached = {
313 let sessions = state.sessions.read().await;
314 sessions.get(session_id).cloned()
315 };
316 assert!(cached.is_some());
317 assert_eq!(cached.unwrap().title, "test-title");
318
319 let loaded = state.storage.load_session(session_id).await;
321 assert!(loaded.is_ok());
322 assert_eq!(loaded.unwrap().unwrap().title, "test-title");
323 }
324}