Skip to main content

bamboo_storage/
session_merge.rs

1//! Merge-aware session save helper.
2//!
3//! Provides [`merge_save_session`], which preserves any concurrent UI edits to
4//! the authoritative metadata group (`title`, `title_version`, `pinned`,
5//! `metadata_version`) before writing the runtime-modified session to storage.
6//! Re-reads the latest persisted copy and only takes in-memory values when the
7//! caller's `metadata_version` strictly exceeds disk's.
8//!
9//! ## Field-by-field merge policy
10//!
11//! All authoritative metadata fields are grouped under `metadata_version`:
12//! when `disk.metadata_version >= session.metadata_version`, the on-disk
13//! `title`, `title_version`, `pinned`, and `metadata_version` overwrite the
14//! in-memory values before writing. Authoritative writers bump
15//! `metadata_version` (and `title_version` for title edits) before calling so
16//! their values survive the merge; non-authoritative writers don't bump and so
17//! are overwritten by any later disk changes.
18//!
19//! ## Two save primitives
20//!
21//! - **`merge_save_session`** — stateless merge+save. Still works for
22//!   non-authoritative writers that hold `Arc<dyn Storage>` directly.
23//! - **`LockedSessionStore::merge_save_runtime`** — per-session-locked variant
24//!   that additionally serializes writes for the same session. Prefer this for
25//!   server-side paths where an authoritative writer may race with a runtime
26//!   save.
27//! - **`LockedSessionStore::commit_metadata`** — plain save inside a per-session
28//!   lock. For authoritative writers that have already performed
29//!   load→mutate→bump inside the lock; no merge needed (they hold the latest).
30//!
31//! Bare [`Storage::save_session`] is reserved for first-write paths (e.g. new
32//! session creation) where there is no prior on-disk copy to merge against.
33
34use std::sync::Arc;
35
36use bamboo_domain::session::types::Session;
37use bamboo_domain::storage::Storage;
38use bamboo_domain::RuntimeSessionPersistence;
39use dashmap::DashMap;
40use tokio::sync::{Mutex, OwnedMutexGuard};
41
42const AUTHORITATIVE_METADATA_KEYS: &[&str] = &["gold_config"];
43
44// ── LockedSessionStore ────────────────────────────────────────────────
45
46/// Wraps a [`Storage`] implementation with per-session write serialization.
47///
48/// Under the hood it maintains a `DashMap<String, Arc<Mutex<()>>>` so that
49/// only writes targeting the *same* session are serialised; different
50/// sessions proceed concurrently.
51pub struct LockedSessionStore {
52    storage: Arc<dyn Storage>,
53    locks: Arc<DashMap<String, Arc<Mutex<()>>>>,
54}
55
56impl LockedSessionStore {
57    /// Wrap an existing storage backend.
58    pub fn new(storage: Arc<dyn Storage>) -> Self {
59        Self {
60            storage,
61            locks: Arc::new(DashMap::new()),
62        }
63    }
64
65    /// Borrow the inner storage for read-only access.
66    pub fn storage(&self) -> &Arc<dyn Storage> {
67        &self.storage
68    }
69
70    /// Acquire a per-session serialization guard.
71    ///
72    /// Only writes for the **same** session are serialised; writes for
73    /// different sessions can proceed concurrently.
74    pub async fn acquire_lock(&self, session_id: &str) -> OwnedMutexGuard<()> {
75        let lock = self
76            .locks
77            .entry(session_id.to_string())
78            .or_insert_with(|| Arc::new(Mutex::new(())))
79            .clone();
80        lock.lock_owned().await
81    }
82
83    /// Runtime-only save: persist the control-plane (`agent_runtime_state`,
84    /// metadata, …) without rewriting the message history.
85    ///
86    /// This is the fast path for runtime-state mutations that do NOT change
87    /// `messages` — e.g. registering a parent's wait for spawned children. It
88    /// delegates to [`Storage::save_runtime_state`], which writes a small
89    /// sidecar (or falls back to a full save on backends without one).
90    ///
91    /// Like [`Self::merge_save_runtime`], it merges newer authoritative metadata
92    /// from disk so a concurrent UI title/pin edit is never clobbered — but it
93    /// reads only the lightweight control-plane snapshot (no message history) to
94    /// do so.
95    ///
96    /// Callers MUST NOT use this when they have appended messages: the in-memory
97    /// `messages` are ignored by the sidecar and would not be persisted.
98    pub async fn save_runtime_only(&self, session: &mut Session) -> std::io::Result<()> {
99        let _guard = self.acquire_lock(&session.id).await;
100        if let Ok(Some(latest)) = self.storage.load_runtime_control_plane(&session.id).await {
101            apply_authoritative_metadata(session, &latest);
102        }
103        self.storage.save_runtime_state(session).await
104    }
105
106    /// Authoritative metadata commit.
107    ///
108    /// The caller must have already loaded the latest session, mutated the
109    /// metadata fields, and bumped `metadata_version` (and `title_version` if
110    /// applicable).  This method simply acquires the per-session lock and
111    /// performs a plain `storage.save_session`.
112    ///
113    /// The lock guarantees that no other write for this session interleaves
114    /// between the caller's load and this save, so merge is unnecessary.
115    pub async fn commit_metadata(&self, session: &Session) -> std::io::Result<()> {
116        let _guard = self.acquire_lock(&session.id).await;
117        self.storage.save_session(session).await
118    }
119
120    /// Runtime / non-authoritative save with per-session lock.
121    ///
122    /// Inside the lock: reload disk, merge the authoritative metadata group
123    /// (`title`, `title_version`, `pinned`, `metadata_version`) from disk into
124    /// the in-memory copy if disk's `metadata_version >= session.metadata_version`,
125    /// then save.
126    ///
127    /// This is the locked equivalent of [`merge_save_session`]; prefer it for
128    /// server-side paths where an authoritative write may race with this save.
129    pub async fn merge_save_runtime(&self, session: &mut Session) -> std::io::Result<()> {
130        let _guard = self.acquire_lock(&session.id).await;
131
132        // Single disk read serves BOTH the SHRINK diagnostic and the
133        // authoritative-metadata merge below. Previously this path loaded the
134        // session twice (once here, once inside the merge helper); on a parent
135        // session carrying the full conversation history that doubled the
136        // deserialization cost of every runtime save, which is the hot path
137        // during sub-agent spawn.
138        let latest = self.storage.load_session(&session.id).await.ok().flatten();
139
140        // DIAGNOSTIC: merge_save_runtime overwrites the whole `messages` array
141        // (it only merges authoritative metadata, not messages). If the incoming
142        // session is stale (fewer messages than what is already on disk), this save
143        // silently reverts a concurrent append (e.g. a just-persisted user message).
144        // Log a SHRINK warning so we can identify the stale writer.
145        let existing_message_count = latest.as_ref().map(|s| s.messages.len());
146        let incoming_message_count = session.messages.len();
147        if existing_message_count.is_some_and(|existing| existing > incoming_message_count) {
148            tracing::warn!(
149                "[{}] merge_save_runtime SHRINK: disk has {:?} messages, saving {} (last_role={:?}, updated_at={}); a stale writer is reverting a concurrent append",
150                session.id,
151                existing_message_count,
152                incoming_message_count,
153                session.messages.last().map(|m| format!("{:?}", m.role)),
154                session.updated_at,
155            );
156        } else {
157            tracing::debug!(
158                "[{}] merge_save_runtime: disk={:?} messages, saving {} (updated_at={})",
159                session.id,
160                existing_message_count,
161                incoming_message_count,
162                session.updated_at,
163            );
164        }
165
166        if let Some(latest) = latest.as_ref() {
167            apply_authoritative_metadata(session, latest);
168        }
169        self.storage.save_session(session).await
170    }
171
172    /// Apply a config-only mutation to a session without ever clobbering its
173    /// `messages` (or other concurrently-written state).
174    ///
175    /// Unlike [`Self::merge_save_runtime`], the caller does NOT pass a session
176    /// snapshot. Instead this loads the **latest** session from storage *inside*
177    /// the per-session lock, applies `mutate` (intended for small config fields
178    /// like `model_ref` / `reasoning_effort`), and saves. Because the load and
179    /// save both happen under the lock, a concurrent append (e.g. `POST /chat`
180    /// adding a user message) can never be reverted by this write.
181    ///
182    /// Returns the saved session, or `None` if it does not exist.
183    pub async fn update_runtime_config<F>(
184        &self,
185        session_id: &str,
186        mutate: F,
187    ) -> std::io::Result<Option<Session>>
188    where
189        F: FnOnce(&mut Session),
190    {
191        let _guard = self.acquire_lock(session_id).await;
192        let Some(mut session) = self.storage.load_session(session_id).await? else {
193            return Ok(None);
194        };
195        mutate(&mut session);
196        self.storage.save_session(&session).await?;
197        Ok(Some(session))
198    }
199}
200
201/// Infrastructure implementation of the domain runtime-persistence port.
202/// Server should assemble this as `Arc<dyn RuntimeSessionPersistence>` and must
203/// not define a separate adapter layer for the same behavior.
204#[async_trait::async_trait]
205impl RuntimeSessionPersistence for LockedSessionStore {
206    async fn save_runtime_session(&self, session: &mut Session) -> std::io::Result<()> {
207        self.merge_save_runtime(session).await
208    }
209}
210
211// ── Internal merge helper ─────────────────────────────────────────────
212
213/// Re-read the on-disk session and, when the disk copy carries a
214/// `metadata_version >= session.metadata_version`, overwrite the in-memory
215/// authoritative metadata fields with the disk values.
216///
217/// This is the core staleness-correction: non-authoritative writers call it
218/// before saving so they don't accidentally revert a concurrent UI edit.
219async fn merge_authoritative_metadata_into_stale(
220    storage: &Arc<dyn Storage>,
221    session: &mut Session,
222) {
223    if let Ok(Some(latest)) = storage.load_session(&session.id).await {
224        apply_authoritative_metadata(session, &latest);
225    }
226}
227
228/// Pure merge step: given a freshly-loaded on-disk copy, overwrite the
229/// in-memory authoritative metadata group when disk's `metadata_version` is at
230/// least the in-memory one. Split out so callers that have already loaded the
231/// disk copy (e.g. [`LockedSessionStore::merge_save_runtime`]) don't pay for a
232/// second read.
233fn apply_authoritative_metadata(session: &mut Session, latest: &Session) {
234    if latest.metadata_version >= session.metadata_version {
235        session.title = latest.title.clone();
236        session.title_version = latest.title_version;
237        session.pinned = latest.pinned;
238        for key in AUTHORITATIVE_METADATA_KEYS {
239            if let Some(value) = latest.metadata.get(*key) {
240                session.metadata.insert((*key).to_string(), value.clone());
241            } else {
242                session.metadata.remove(*key);
243            }
244        }
245        session.metadata_version = latest.metadata_version;
246    }
247}
248
249// ── Free merge-save function ──────────────────────────────────────────
250
251/// Save a session while preserving any concurrent UI edits to the
252/// authoritative metadata group.
253///
254/// Behaviour: if the on-disk session has `metadata_version >=
255/// session.metadata_version`, the on-disk `title`, `title_version`, `pinned`
256/// and `metadata_version` overwrite the in-memory values before writing.
257///
258/// This is the stateless variant (no per-session lock). Prefer
259/// [`LockedSessionStore::merge_save_runtime`] for server-side paths where an
260/// authoritative writer may race with this save.
261pub async fn merge_save_session(
262    storage: &Arc<dyn Storage>,
263    session: &mut Session,
264) -> std::io::Result<()> {
265    merge_authoritative_metadata_into_stale(storage, session).await;
266    storage.save_session(session).await
267}
268
269// ── Tests ─────────────────────────────────────────────────────────────
270
271#[cfg(test)]
272mod tests {
273    use super::*;
274    use crate::v2::SessionStoreV2;
275    use bamboo_domain::session::types::Session;
276
277    async fn make_storage() -> (tempfile::TempDir, Arc<dyn Storage>) {
278        let temp = tempfile::tempdir().unwrap();
279        let storage = SessionStoreV2::new(temp.path().to_path_buf())
280            .await
281            .expect("storage init");
282        (temp, Arc::new(storage) as Arc<dyn Storage>)
283    }
284
285    fn fresh(id: &str) -> Session {
286        Session::new(id.to_string(), "test-model".to_string())
287    }
288
289    // ── update_runtime_config: config patches must never clobber messages ──
290
291    #[tokio::test]
292    async fn update_runtime_config_preserves_concurrently_appended_messages() {
293        use bamboo_domain::session::types::Message;
294        use bamboo_domain::ReasoningEffort;
295
296        let (_temp, storage) = make_storage().await;
297        let store = LockedSessionStore::new(storage.clone());
298        let session_id = "cfg-preserve";
299
300        // Persisted baseline: one user + one assistant turn.
301        let mut initial = fresh(session_id);
302        initial.add_message(Message::user("hello"));
303        initial.add_message(Message::assistant("hi", None));
304        storage.save_session(&initial).await.unwrap();
305
306        // Simulate `POST /chat` appending a new user message to disk.
307        let mut after_chat = storage.load_session(session_id).await.unwrap().unwrap();
308        after_chat.add_message(Message::user("second question"));
309        storage.save_session(&after_chat).await.unwrap();
310        assert_eq!(after_chat.messages.len(), 3);
311
312        // A config-only patch must load the freshest session and preserve the
313        // appended message (this is the regression that broke message sending on
314        // existing sessions).
315        let updated = store
316            .update_runtime_config(session_id, |s| {
317                s.reasoning_effort = Some(ReasoningEffort::Max);
318            })
319            .await
320            .unwrap()
321            .expect("session exists");
322
323        assert_eq!(updated.reasoning_effort, Some(ReasoningEffort::Max));
324        assert_eq!(
325            updated.messages.len(),
326            3,
327            "config patch must not revert a concurrently-appended message"
328        );
329
330        let on_disk = storage.load_session(session_id).await.unwrap().unwrap();
331        assert_eq!(on_disk.messages.len(), 3);
332        assert_eq!(on_disk.reasoning_effort, Some(ReasoningEffort::Max));
333    }
334
335    #[tokio::test]
336    async fn update_runtime_config_returns_none_for_missing_session() {
337        use bamboo_domain::ReasoningEffort;
338
339        let (_temp, storage) = make_storage().await;
340        let store = LockedSessionStore::new(storage);
341        let result = store
342            .update_runtime_config("does-not-exist", |s| {
343                s.reasoning_effort = Some(ReasoningEffort::Low);
344            })
345            .await
346            .unwrap();
347        assert!(result.is_none());
348    }
349
350    #[tokio::test]
351    async fn merge_save_runtime_overwrites_messages_from_stale_snapshot() {
352        // Characterization of the bug that motivated `update_runtime_config`:
353        // `merge_save_runtime` writes the caller's `messages` verbatim, so a
354        // stale snapshot reverts a concurrent append. Config-only writers must
355        // therefore use `update_runtime_config`, never `merge_save_runtime`.
356        use bamboo_domain::session::types::Message;
357
358        let (_temp, storage) = make_storage().await;
359        let store = LockedSessionStore::new(storage.clone());
360        let session_id = "stale-clobber";
361
362        // A handler loads the session (1 message) …
363        let mut baseline = fresh(session_id);
364        baseline.add_message(Message::user("hello"));
365        storage.save_session(&baseline).await.unwrap();
366        let mut stale_snapshot = storage.load_session(session_id).await.unwrap().unwrap();
367
368        // … then `POST /chat` appends a second message to disk …
369        let mut after_chat = storage.load_session(session_id).await.unwrap().unwrap();
370        after_chat.add_message(Message::user("second"));
371        storage.save_session(&after_chat).await.unwrap();
372        assert_eq!(
373            storage
374                .load_session(session_id)
375                .await
376                .unwrap()
377                .unwrap()
378                .messages
379                .len(),
380            2
381        );
382
383        // … and the stale handler saves via merge_save_runtime -> append reverted.
384        store.merge_save_runtime(&mut stale_snapshot).await.unwrap();
385        let after = storage.load_session(session_id).await.unwrap().unwrap();
386        assert_eq!(
387            after.messages.len(),
388            1,
389            "merge_save_runtime clobbers concurrent appends — this is why config patches must use update_runtime_config"
390        );
391    }
392
393    #[tokio::test]
394    async fn merge_save_runtime_preserves_disk_authoritative_metadata_with_single_load() {
395        // Regression guard for the single-load refactor of `merge_save_runtime`:
396        // it must STILL pull the authoritative metadata group (title / pinned /
397        // metadata_version) from the freshest on-disk copy when disk's
398        // metadata_version >= the in-memory one, even though it now reads disk
399        // only once.
400        let (_temp, storage) = make_storage().await;
401        let store = LockedSessionStore::new(storage.clone());
402        let session_id = "runtime-merge-meta";
403
404        // Baseline persisted by a runtime writer (metadata_version 0).
405        let mut baseline = fresh(session_id);
406        baseline.title = "Auto Title".to_string();
407        baseline.metadata_version = 0;
408        storage.save_session(&baseline).await.unwrap();
409
410        // A stale runtime snapshot (still metadata_version 0, old title).
411        let mut stale_snapshot = storage.load_session(session_id).await.unwrap().unwrap();
412
413        // An authoritative UI rename bumps metadata_version on disk.
414        let mut renamed = storage.load_session(session_id).await.unwrap().unwrap();
415        renamed.title = "User Renamed".to_string();
416        renamed.title_version = 1;
417        renamed.pinned = true;
418        renamed.metadata_version = 1;
419        store.commit_metadata(&renamed).await.unwrap();
420
421        // The stale runtime writer saves: it must adopt the disk title/pinned.
422        stale_snapshot.title = "Auto Title".to_string();
423        store.merge_save_runtime(&mut stale_snapshot).await.unwrap();
424
425        let after = storage.load_session(session_id).await.unwrap().unwrap();
426        assert_eq!(after.title, "User Renamed");
427        assert!(after.pinned);
428        assert_eq!(after.metadata_version, 1);
429        // And the in-memory copy was corrected by the merge too.
430        assert_eq!(stale_snapshot.title, "User Renamed");
431        assert_eq!(stale_snapshot.metadata_version, 1);
432    }
433
434    // ── Free-function merge tests (updated for metadata-group) ──────
435
436    #[tokio::test]
437    async fn merge_preserves_disk_title_when_versions_equal() {
438        let (_temp, storage) = make_storage().await;
439        let session_id = "merge-equal";
440
441        let mut on_disk = fresh(session_id);
442        on_disk.title = "User Set This".to_string();
443        on_disk.title_version = 0;
444        on_disk.metadata_version = 0;
445        storage.save_session(&on_disk).await.unwrap();
446
447        let mut runtime_copy = fresh(session_id);
448        runtime_copy.title = "Stale Default".to_string();
449        runtime_copy.title_version = 0;
450        runtime_copy.metadata_version = 0;
451        runtime_copy.messages = vec![];
452
453        merge_save_session(&storage, &mut runtime_copy)
454            .await
455            .unwrap();
456
457        let after = storage.load_session(session_id).await.unwrap().unwrap();
458        assert_eq!(after.title, "User Set This");
459        assert_eq!(after.title_version, 0);
460        assert_eq!(runtime_copy.title, "User Set This");
461    }
462
463    #[tokio::test]
464    async fn merge_preserves_disk_when_disk_version_higher() {
465        let (_temp, storage) = make_storage().await;
466        let session_id = "merge-higher";
467
468        let mut on_disk = fresh(session_id);
469        on_disk.title = "User Title v3".to_string();
470        on_disk.title_version = 3;
471        on_disk.metadata_version = 5;
472        storage.save_session(&on_disk).await.unwrap();
473
474        let mut runtime_copy = fresh(session_id);
475        runtime_copy.title = "Stale".to_string();
476        runtime_copy.title_version = 1;
477        runtime_copy.metadata_version = 0;
478
479        merge_save_session(&storage, &mut runtime_copy)
480            .await
481            .unwrap();
482
483        let after = storage.load_session(session_id).await.unwrap().unwrap();
484        assert_eq!(after.title, "User Title v3");
485        assert_eq!(after.title_version, 3);
486        assert_eq!(after.metadata_version, 5);
487    }
488
489    #[tokio::test]
490    async fn merge_now_preserves_disk_pinned_in_metadata_group() {
491        let (_temp, storage) = make_storage().await;
492        let session_id = "pinned-merge";
493
494        let mut on_disk = fresh(session_id);
495        on_disk.pinned = true;
496        on_disk.metadata_version = 2;
497        storage.save_session(&on_disk).await.unwrap();
498
499        let mut runtime_copy = fresh(session_id);
500        runtime_copy.pinned = false;
501        runtime_copy.metadata_version = 0;
502
503        merge_save_session(&storage, &mut runtime_copy)
504            .await
505            .unwrap();
506
507        let after = storage.load_session(session_id).await.unwrap().unwrap();
508        assert!(
509            after.pinned,
510            "disk pinned=true should win over runtime false"
511        );
512        assert_eq!(after.metadata_version, 2);
513    }
514
515    #[tokio::test]
516    async fn merge_keeps_in_memory_when_session_version_higher() {
517        let (_temp, storage) = make_storage().await;
518        let session_id = "merge-bumped";
519
520        let mut on_disk = fresh(session_id);
521        on_disk.title = "Old".to_string();
522        on_disk.title_version = 1;
523        on_disk.metadata_version = 3;
524        storage.save_session(&on_disk).await.unwrap();
525
526        let mut authoritative_copy = fresh(session_id);
527        authoritative_copy.title = "New Authoritative".to_string();
528        authoritative_copy.title_version = 2;
529        authoritative_copy.metadata_version = 4;
530        authoritative_copy.pinned = true;
531
532        merge_save_session(&storage, &mut authoritative_copy)
533            .await
534            .unwrap();
535
536        let after = storage.load_session(session_id).await.unwrap().unwrap();
537        assert_eq!(after.title, "New Authoritative");
538        assert_eq!(after.title_version, 2);
539        assert_eq!(after.metadata_version, 4);
540        assert!(after.pinned);
541    }
542
543    #[tokio::test]
544    async fn merge_keeps_runtime_messages_when_disk_only_changed_metadata() {
545        let (_temp, storage) = make_storage().await;
546        let session_id = "merge-messages";
547
548        let mut on_disk = fresh(session_id);
549        on_disk.title = "Fresh Title".to_string();
550        on_disk.title_version = 2;
551        on_disk.metadata_version = 5;
552        storage.save_session(&on_disk).await.unwrap();
553
554        let mut runtime_copy = fresh(session_id);
555        runtime_copy.title = "Stale".to_string();
556        runtime_copy.metadata_version = 0;
557        runtime_copy.messages = vec![bamboo_domain::session::types::Message {
558            role: bamboo_domain::session::types::Role::User,
559            content: "keep me".to_string(),
560            id: "msg-1".to_string(),
561            created_at: chrono::Utc::now(),
562            reasoning: None,
563            content_parts: None,
564            image_ocr: None,
565            phase: None,
566            tool_calls: None,
567            tool_call_id: None,
568            tool_success: None,
569            compressed: false,
570            compressed_by_event_id: None,
571            never_compress: false,
572            compression_level: 0,
573            metadata: None,
574        }];
575
576        merge_save_session(&storage, &mut runtime_copy)
577            .await
578            .unwrap();
579
580        let after = storage.load_session(session_id).await.unwrap().unwrap();
581        assert_eq!(after.title, "Fresh Title");
582        assert_eq!(after.metadata_version, 5);
583        assert_eq!(after.messages.len(), 1);
584        assert_eq!(after.messages[0].content, "keep me");
585    }
586
587    // ── LockedSessionStore tests ────────────────────────────────────
588
589    #[tokio::test]
590    async fn locked_merge_save_runtime_serialises_concurrent_writes() {
591        let (_temp, storage) = make_storage().await;
592        let store = Arc::new(LockedSessionStore::new(storage));
593        let session_id = "lock-serial".to_string();
594
595        // Seed with base version.
596        let base = fresh(&session_id);
597        store.storage().save_session(&base).await.unwrap();
598
599        // Two concurrent authorised writers each bump and commit.
600        // We'll simulate via clone-and-bump-then-commit.
601        let store_a = store.clone();
602        let store_b = store.clone();
603        let sid_a = session_id.clone();
604        let sid_b = session_id.clone();
605
606        let a = tokio::spawn(async move {
607            let _guard = store_a.acquire_lock(&sid_a).await;
608            let mut s = store_a
609                .storage()
610                .load_session(&sid_a)
611                .await
612                .unwrap()
613                .unwrap();
614            s.title = "Writer A".to_string();
615            s.title_version = s.title_version.saturating_add(1);
616            s.metadata_version = s.metadata_version.saturating_add(1);
617            s.updated_at = chrono::Utc::now();
618            store_a.storage().save_session(&s).await.unwrap();
619            s.title_version
620        });
621
622        // Tiny yield so A goes first.
623        tokio::time::sleep(std::time::Duration::from_millis(10)).await;
624
625        let b = tokio::spawn(async move {
626            let _guard = store_b.acquire_lock(&sid_b).await;
627            let mut s = store_b
628                .storage()
629                .load_session(&sid_b)
630                .await
631                .unwrap()
632                .unwrap();
633            s.title = "Writer B".to_string();
634            s.title_version = s.title_version.saturating_add(1);
635            s.metadata_version = s.metadata_version.saturating_add(1);
636            s.updated_at = chrono::Utc::now();
637            store_b.storage().save_session(&s).await.unwrap();
638            s.title_version
639        });
640
641        let (ver_a, ver_b) = tokio::join!(a, b);
642        let final_s = store
643            .storage()
644            .load_session(&session_id)
645            .await
646            .unwrap()
647            .unwrap();
648        assert!(
649            ver_a.unwrap() != ver_b.unwrap(),
650            "concurrent writers must produce distinct versions"
651        );
652        assert_eq!(final_s.metadata_version, 2);
653    }
654
655    #[tokio::test]
656    async fn commit_metadata_is_plain_save_inside_lock() {
657        let (_temp, storage) = make_storage().await;
658        let store = LockedSessionStore::new(storage);
659        let session_id = "commit-plain";
660
661        let mut s = fresh(session_id);
662        s.title = "Committed".to_string();
663        s.metadata_version = 1;
664        s.title_version = 2;
665
666        store.commit_metadata(&s).await.unwrap();
667
668        let after = store
669            .storage()
670            .load_session(session_id)
671            .await
672            .unwrap()
673            .unwrap();
674        assert_eq!(after.title, "Committed");
675        assert_eq!(after.metadata_version, 1);
676        assert_eq!(after.title_version, 2);
677    }
678}