1use std::sync::Arc;
35
36use bamboo_domain::session::types::Session;
37use bamboo_domain::storage::Storage;
38use bamboo_domain::RuntimeSessionPersistence;
39use dashmap::DashMap;
40use tokio::sync::{Mutex, OwnedMutexGuard};
41
42const AUTHORITATIVE_METADATA_KEYS: &[&str] = &["gold_config"];
43
44pub struct LockedSessionStore {
52 storage: Arc<dyn Storage>,
53 locks: Arc<DashMap<String, Arc<Mutex<()>>>>,
54}
55
56impl LockedSessionStore {
57 pub fn new(storage: Arc<dyn Storage>) -> Self {
59 Self {
60 storage,
61 locks: Arc::new(DashMap::new()),
62 }
63 }
64
65 pub fn storage(&self) -> &Arc<dyn Storage> {
67 &self.storage
68 }
69
70 pub async fn acquire_lock(&self, session_id: &str) -> OwnedMutexGuard<()> {
75 let lock = self
76 .locks
77 .entry(session_id.to_string())
78 .or_insert_with(|| Arc::new(Mutex::new(())))
79 .clone();
80 lock.lock_owned().await
81 }
82
83 pub async fn save_runtime_only(&self, session: &mut Session) -> std::io::Result<()> {
99 let _guard = self.acquire_lock(&session.id).await;
100 if let Ok(Some(latest)) = self.storage.load_runtime_control_plane(&session.id).await {
101 apply_authoritative_metadata(session, &latest);
102 }
103 self.storage.save_runtime_state(session).await
104 }
105
106 pub async fn commit_metadata(&self, session: &Session) -> std::io::Result<()> {
116 let _guard = self.acquire_lock(&session.id).await;
117 self.storage.save_session(session).await
118 }
119
120 pub async fn merge_save_runtime(&self, session: &mut Session) -> std::io::Result<()> {
130 let _guard = self.acquire_lock(&session.id).await;
131
132 let latest = self.storage.load_session(&session.id).await.ok().flatten();
139
140 let existing_message_count = latest.as_ref().map(|s| s.messages.len());
146 let incoming_message_count = session.messages.len();
147 if existing_message_count.is_some_and(|existing| existing > incoming_message_count) {
148 tracing::warn!(
149 "[{}] merge_save_runtime SHRINK: disk has {:?} messages, saving {} (last_role={:?}, updated_at={}); a stale writer is reverting a concurrent append",
150 session.id,
151 existing_message_count,
152 incoming_message_count,
153 session.messages.last().map(|m| format!("{:?}", m.role)),
154 session.updated_at,
155 );
156 } else {
157 tracing::debug!(
158 "[{}] merge_save_runtime: disk={:?} messages, saving {} (updated_at={})",
159 session.id,
160 existing_message_count,
161 incoming_message_count,
162 session.updated_at,
163 );
164 }
165
166 if let Some(latest) = latest.as_ref() {
167 apply_authoritative_metadata(session, latest);
168 }
169 self.storage.save_session(session).await
170 }
171
172 pub async fn update_runtime_config<F>(
184 &self,
185 session_id: &str,
186 mutate: F,
187 ) -> std::io::Result<Option<Session>>
188 where
189 F: FnOnce(&mut Session),
190 {
191 let _guard = self.acquire_lock(session_id).await;
192 let Some(mut session) = self.storage.load_session(session_id).await? else {
193 return Ok(None);
194 };
195 mutate(&mut session);
196 self.storage.save_session(&session).await?;
197 Ok(Some(session))
198 }
199}
200
201#[async_trait::async_trait]
205impl RuntimeSessionPersistence for LockedSessionStore {
206 async fn save_runtime_session(&self, session: &mut Session) -> std::io::Result<()> {
207 self.merge_save_runtime(session).await
208 }
209}
210
211async fn merge_authoritative_metadata_into_stale(
220 storage: &Arc<dyn Storage>,
221 session: &mut Session,
222) {
223 if let Ok(Some(latest)) = storage.load_session(&session.id).await {
224 apply_authoritative_metadata(session, &latest);
225 }
226}
227
228fn apply_authoritative_metadata(session: &mut Session, latest: &Session) {
234 if latest.metadata_version >= session.metadata_version {
235 session.title = latest.title.clone();
236 session.title_version = latest.title_version;
237 session.pinned = latest.pinned;
238 for key in AUTHORITATIVE_METADATA_KEYS {
239 if let Some(value) = latest.metadata.get(*key) {
240 session.metadata.insert((*key).to_string(), value.clone());
241 } else {
242 session.metadata.remove(*key);
243 }
244 }
245 session.metadata_version = latest.metadata_version;
246 }
247}
248
249pub async fn merge_save_session(
262 storage: &Arc<dyn Storage>,
263 session: &mut Session,
264) -> std::io::Result<()> {
265 merge_authoritative_metadata_into_stale(storage, session).await;
266 storage.save_session(session).await
267}
268
269#[cfg(test)]
272mod tests {
273 use super::*;
274 use crate::v2::SessionStoreV2;
275 use bamboo_domain::session::types::Session;
276
277 async fn make_storage() -> (tempfile::TempDir, Arc<dyn Storage>) {
278 let temp = tempfile::tempdir().unwrap();
279 let storage = SessionStoreV2::new(temp.path().to_path_buf())
280 .await
281 .expect("storage init");
282 (temp, Arc::new(storage) as Arc<dyn Storage>)
283 }
284
285 fn fresh(id: &str) -> Session {
286 Session::new(id.to_string(), "test-model".to_string())
287 }
288
289 #[tokio::test]
292 async fn update_runtime_config_preserves_concurrently_appended_messages() {
293 use bamboo_domain::session::types::Message;
294 use bamboo_domain::ReasoningEffort;
295
296 let (_temp, storage) = make_storage().await;
297 let store = LockedSessionStore::new(storage.clone());
298 let session_id = "cfg-preserve";
299
300 let mut initial = fresh(session_id);
302 initial.add_message(Message::user("hello"));
303 initial.add_message(Message::assistant("hi", None));
304 storage.save_session(&initial).await.unwrap();
305
306 let mut after_chat = storage.load_session(session_id).await.unwrap().unwrap();
308 after_chat.add_message(Message::user("second question"));
309 storage.save_session(&after_chat).await.unwrap();
310 assert_eq!(after_chat.messages.len(), 3);
311
312 let updated = store
316 .update_runtime_config(session_id, |s| {
317 s.reasoning_effort = Some(ReasoningEffort::Max);
318 })
319 .await
320 .unwrap()
321 .expect("session exists");
322
323 assert_eq!(updated.reasoning_effort, Some(ReasoningEffort::Max));
324 assert_eq!(
325 updated.messages.len(),
326 3,
327 "config patch must not revert a concurrently-appended message"
328 );
329
330 let on_disk = storage.load_session(session_id).await.unwrap().unwrap();
331 assert_eq!(on_disk.messages.len(), 3);
332 assert_eq!(on_disk.reasoning_effort, Some(ReasoningEffort::Max));
333 }
334
335 #[tokio::test]
336 async fn update_runtime_config_returns_none_for_missing_session() {
337 use bamboo_domain::ReasoningEffort;
338
339 let (_temp, storage) = make_storage().await;
340 let store = LockedSessionStore::new(storage);
341 let result = store
342 .update_runtime_config("does-not-exist", |s| {
343 s.reasoning_effort = Some(ReasoningEffort::Low);
344 })
345 .await
346 .unwrap();
347 assert!(result.is_none());
348 }
349
350 #[tokio::test]
351 async fn merge_save_runtime_overwrites_messages_from_stale_snapshot() {
352 use bamboo_domain::session::types::Message;
357
358 let (_temp, storage) = make_storage().await;
359 let store = LockedSessionStore::new(storage.clone());
360 let session_id = "stale-clobber";
361
362 let mut baseline = fresh(session_id);
364 baseline.add_message(Message::user("hello"));
365 storage.save_session(&baseline).await.unwrap();
366 let mut stale_snapshot = storage.load_session(session_id).await.unwrap().unwrap();
367
368 let mut after_chat = storage.load_session(session_id).await.unwrap().unwrap();
370 after_chat.add_message(Message::user("second"));
371 storage.save_session(&after_chat).await.unwrap();
372 assert_eq!(
373 storage
374 .load_session(session_id)
375 .await
376 .unwrap()
377 .unwrap()
378 .messages
379 .len(),
380 2
381 );
382
383 store.merge_save_runtime(&mut stale_snapshot).await.unwrap();
385 let after = storage.load_session(session_id).await.unwrap().unwrap();
386 assert_eq!(
387 after.messages.len(),
388 1,
389 "merge_save_runtime clobbers concurrent appends — this is why config patches must use update_runtime_config"
390 );
391 }
392
393 #[tokio::test]
394 async fn merge_save_runtime_preserves_disk_authoritative_metadata_with_single_load() {
395 let (_temp, storage) = make_storage().await;
401 let store = LockedSessionStore::new(storage.clone());
402 let session_id = "runtime-merge-meta";
403
404 let mut baseline = fresh(session_id);
406 baseline.title = "Auto Title".to_string();
407 baseline.metadata_version = 0;
408 storage.save_session(&baseline).await.unwrap();
409
410 let mut stale_snapshot = storage.load_session(session_id).await.unwrap().unwrap();
412
413 let mut renamed = storage.load_session(session_id).await.unwrap().unwrap();
415 renamed.title = "User Renamed".to_string();
416 renamed.title_version = 1;
417 renamed.pinned = true;
418 renamed.metadata_version = 1;
419 store.commit_metadata(&renamed).await.unwrap();
420
421 stale_snapshot.title = "Auto Title".to_string();
423 store.merge_save_runtime(&mut stale_snapshot).await.unwrap();
424
425 let after = storage.load_session(session_id).await.unwrap().unwrap();
426 assert_eq!(after.title, "User Renamed");
427 assert!(after.pinned);
428 assert_eq!(after.metadata_version, 1);
429 assert_eq!(stale_snapshot.title, "User Renamed");
431 assert_eq!(stale_snapshot.metadata_version, 1);
432 }
433
434 #[tokio::test]
437 async fn merge_preserves_disk_title_when_versions_equal() {
438 let (_temp, storage) = make_storage().await;
439 let session_id = "merge-equal";
440
441 let mut on_disk = fresh(session_id);
442 on_disk.title = "User Set This".to_string();
443 on_disk.title_version = 0;
444 on_disk.metadata_version = 0;
445 storage.save_session(&on_disk).await.unwrap();
446
447 let mut runtime_copy = fresh(session_id);
448 runtime_copy.title = "Stale Default".to_string();
449 runtime_copy.title_version = 0;
450 runtime_copy.metadata_version = 0;
451 runtime_copy.messages = vec![];
452
453 merge_save_session(&storage, &mut runtime_copy)
454 .await
455 .unwrap();
456
457 let after = storage.load_session(session_id).await.unwrap().unwrap();
458 assert_eq!(after.title, "User Set This");
459 assert_eq!(after.title_version, 0);
460 assert_eq!(runtime_copy.title, "User Set This");
461 }
462
463 #[tokio::test]
464 async fn merge_preserves_disk_when_disk_version_higher() {
465 let (_temp, storage) = make_storage().await;
466 let session_id = "merge-higher";
467
468 let mut on_disk = fresh(session_id);
469 on_disk.title = "User Title v3".to_string();
470 on_disk.title_version = 3;
471 on_disk.metadata_version = 5;
472 storage.save_session(&on_disk).await.unwrap();
473
474 let mut runtime_copy = fresh(session_id);
475 runtime_copy.title = "Stale".to_string();
476 runtime_copy.title_version = 1;
477 runtime_copy.metadata_version = 0;
478
479 merge_save_session(&storage, &mut runtime_copy)
480 .await
481 .unwrap();
482
483 let after = storage.load_session(session_id).await.unwrap().unwrap();
484 assert_eq!(after.title, "User Title v3");
485 assert_eq!(after.title_version, 3);
486 assert_eq!(after.metadata_version, 5);
487 }
488
489 #[tokio::test]
490 async fn merge_now_preserves_disk_pinned_in_metadata_group() {
491 let (_temp, storage) = make_storage().await;
492 let session_id = "pinned-merge";
493
494 let mut on_disk = fresh(session_id);
495 on_disk.pinned = true;
496 on_disk.metadata_version = 2;
497 storage.save_session(&on_disk).await.unwrap();
498
499 let mut runtime_copy = fresh(session_id);
500 runtime_copy.pinned = false;
501 runtime_copy.metadata_version = 0;
502
503 merge_save_session(&storage, &mut runtime_copy)
504 .await
505 .unwrap();
506
507 let after = storage.load_session(session_id).await.unwrap().unwrap();
508 assert!(
509 after.pinned,
510 "disk pinned=true should win over runtime false"
511 );
512 assert_eq!(after.metadata_version, 2);
513 }
514
515 #[tokio::test]
516 async fn merge_keeps_in_memory_when_session_version_higher() {
517 let (_temp, storage) = make_storage().await;
518 let session_id = "merge-bumped";
519
520 let mut on_disk = fresh(session_id);
521 on_disk.title = "Old".to_string();
522 on_disk.title_version = 1;
523 on_disk.metadata_version = 3;
524 storage.save_session(&on_disk).await.unwrap();
525
526 let mut authoritative_copy = fresh(session_id);
527 authoritative_copy.title = "New Authoritative".to_string();
528 authoritative_copy.title_version = 2;
529 authoritative_copy.metadata_version = 4;
530 authoritative_copy.pinned = true;
531
532 merge_save_session(&storage, &mut authoritative_copy)
533 .await
534 .unwrap();
535
536 let after = storage.load_session(session_id).await.unwrap().unwrap();
537 assert_eq!(after.title, "New Authoritative");
538 assert_eq!(after.title_version, 2);
539 assert_eq!(after.metadata_version, 4);
540 assert!(after.pinned);
541 }
542
543 #[tokio::test]
544 async fn merge_keeps_runtime_messages_when_disk_only_changed_metadata() {
545 let (_temp, storage) = make_storage().await;
546 let session_id = "merge-messages";
547
548 let mut on_disk = fresh(session_id);
549 on_disk.title = "Fresh Title".to_string();
550 on_disk.title_version = 2;
551 on_disk.metadata_version = 5;
552 storage.save_session(&on_disk).await.unwrap();
553
554 let mut runtime_copy = fresh(session_id);
555 runtime_copy.title = "Stale".to_string();
556 runtime_copy.metadata_version = 0;
557 runtime_copy.messages = vec![bamboo_domain::session::types::Message {
558 role: bamboo_domain::session::types::Role::User,
559 content: "keep me".to_string(),
560 id: "msg-1".to_string(),
561 created_at: chrono::Utc::now(),
562 reasoning: None,
563 content_parts: None,
564 image_ocr: None,
565 phase: None,
566 tool_calls: None,
567 tool_call_id: None,
568 tool_success: None,
569 compressed: false,
570 compressed_by_event_id: None,
571 never_compress: false,
572 compression_level: 0,
573 metadata: None,
574 }];
575
576 merge_save_session(&storage, &mut runtime_copy)
577 .await
578 .unwrap();
579
580 let after = storage.load_session(session_id).await.unwrap().unwrap();
581 assert_eq!(after.title, "Fresh Title");
582 assert_eq!(after.metadata_version, 5);
583 assert_eq!(after.messages.len(), 1);
584 assert_eq!(after.messages[0].content, "keep me");
585 }
586
587 #[tokio::test]
590 async fn locked_merge_save_runtime_serialises_concurrent_writes() {
591 let (_temp, storage) = make_storage().await;
592 let store = Arc::new(LockedSessionStore::new(storage));
593 let session_id = "lock-serial".to_string();
594
595 let base = fresh(&session_id);
597 store.storage().save_session(&base).await.unwrap();
598
599 let store_a = store.clone();
602 let store_b = store.clone();
603 let sid_a = session_id.clone();
604 let sid_b = session_id.clone();
605
606 let a = tokio::spawn(async move {
607 let _guard = store_a.acquire_lock(&sid_a).await;
608 let mut s = store_a
609 .storage()
610 .load_session(&sid_a)
611 .await
612 .unwrap()
613 .unwrap();
614 s.title = "Writer A".to_string();
615 s.title_version = s.title_version.saturating_add(1);
616 s.metadata_version = s.metadata_version.saturating_add(1);
617 s.updated_at = chrono::Utc::now();
618 store_a.storage().save_session(&s).await.unwrap();
619 s.title_version
620 });
621
622 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
624
625 let b = tokio::spawn(async move {
626 let _guard = store_b.acquire_lock(&sid_b).await;
627 let mut s = store_b
628 .storage()
629 .load_session(&sid_b)
630 .await
631 .unwrap()
632 .unwrap();
633 s.title = "Writer B".to_string();
634 s.title_version = s.title_version.saturating_add(1);
635 s.metadata_version = s.metadata_version.saturating_add(1);
636 s.updated_at = chrono::Utc::now();
637 store_b.storage().save_session(&s).await.unwrap();
638 s.title_version
639 });
640
641 let (ver_a, ver_b) = tokio::join!(a, b);
642 let final_s = store
643 .storage()
644 .load_session(&session_id)
645 .await
646 .unwrap()
647 .unwrap();
648 assert!(
649 ver_a.unwrap() != ver_b.unwrap(),
650 "concurrent writers must produce distinct versions"
651 );
652 assert_eq!(final_s.metadata_version, 2);
653 }
654
655 #[tokio::test]
656 async fn commit_metadata_is_plain_save_inside_lock() {
657 let (_temp, storage) = make_storage().await;
658 let store = LockedSessionStore::new(storage);
659 let session_id = "commit-plain";
660
661 let mut s = fresh(session_id);
662 s.title = "Committed".to_string();
663 s.metadata_version = 1;
664 s.title_version = 2;
665
666 store.commit_metadata(&s).await.unwrap();
667
668 let after = store
669 .storage()
670 .load_session(session_id)
671 .await
672 .unwrap()
673 .unwrap();
674 assert_eq!(after.title, "Committed");
675 assert_eq!(after.metadata_version, 1);
676 assert_eq!(after.title_version, 2);
677 }
678}