Skip to main content

bamboo_server/session_app/
metadata.rs

1//! Authoritative writer for session metadata fields (`title`, `pinned`, …).
2//!
3//! All callers that mutate session metadata MUST go through this service.
4//! Each method follows a fixed pipeline so that title/pinned writes never
5//! diverge in subtle ways (load order, version bump, save semantics, event
6//! shape):
7//!
8//! 1. Trim / validate input (fail-fast before acquiring the lock).
9//! 2. `persistence.acquire_lock(session_id)` — serialise all writes for this
10//!    session so that commit order == publish order.
11//! 3. `storage.load_session(session_id)` — pick up the latest authoritative
12//!    copy from disk (not a runner-held session that may have stale metadata).
13//! 4. Re-check preconditions inside the lock (e.g. `is_untitled` for
14//!    `apply_generated_title` when not forced; equality short-circuit for
15//!    setters that would be a no-op).
16//! 5. Mutate the field, bump `title_version` (for title) and always bump
17//!    `metadata_version`, set `updated_at`.
18//! 6. Plain `storage.save_session(&session)` — no merge needed because we
19//!    loaded the latest copy inside the lock and no other writer for this
20//!    session could have interleaved.
21//! 7. Refresh the in-memory cache (`state.sessions`).
22//! 8. Build the corresponding [`AgentEvent`] from the **final persisted
23//!    session** and publish via [`publish_replayable_session_event`].
24//!
25//! ## Authority rules
26//!
27//! - `set_title` / `apply_generated_title`: the only authoritative writers
28//!   for `title` and `title_version`. The runtime engine, scheduler, and
29//!   tool execution paths are non-authoritative and must stay on
30//!   `merge_save_session` / `merge_save_runtime` without touching `title`
31//!   directly.
32//! - `set_pinned`: the only authoritative writer for `pinned`.
33//! - Only these methods bump `metadata_version`.  Runtime paths never bump it,
34//!   so `merge_save_session` / `merge_save_runtime` can use it as a staleness
35//!   signal for the entire authoritative metadata group.
36
37use bamboo_agent_core::{AgentEvent, Session, TitleSource};
38use chrono::Utc;
39
40use crate::app_state::AppState;
41use crate::events::publish_replayable_session_event;
42use crate::title_gen::is_untitled;
43
44/// Errors returned by [`SessionMetadataService`].
45#[derive(Debug, thiserror::Error)]
46pub enum MetadataError {
47    #[error("session not found: {0}")]
48    NotFound(String),
49    #[error("storage error: {0}")]
50    Storage(String),
51}
52
53/// Outcome of a metadata mutation.
54///
55/// `None` means the request was a no-op (the field already had the requested
56/// value, or a guard rejected the change). `Some(applied)` means the change
57/// was persisted and an event was published.
58pub type MetadataChange<T> = Option<T>;
59
60pub struct SessionMetadataService;
61
62impl SessionMetadataService {
63    /// Manual rename via PATCH. Always authoritative; always bumps
64    /// `title_version` and `metadata_version`. Returns `Ok(None)` when the
65    /// trimmed input equals the existing title (no event emitted).
66    pub async fn set_title(
67        state: &AppState,
68        session_id: &str,
69        new_title: &str,
70    ) -> Result<MetadataChange<(String, u64)>, MetadataError> {
71        let trimmed = new_title.trim();
72        if trimmed.is_empty() {
73            return Err(MetadataError::Storage("title cannot be empty".into()));
74        }
75
76        // Lock: serialise all writes for this session.
77        let _guard = state.persistence.acquire_lock(session_id).await;
78
79        let mut session = load_latest(state, session_id).await?;
80        if session.title == trimmed {
81            return Ok(None);
82        }
83
84        session.title = trimmed.to_string();
85        session.title_version = session.title_version.saturating_add(1);
86        session.metadata_version = session.metadata_version.saturating_add(1);
87        session.updated_at = Utc::now();
88
89        state
90            .persistence
91            .storage()
92            .save_session(&session)
93            .await
94            .map_err(|e| MetadataError::Storage(format!("save_session: {e}")))?;
95        refresh_in_memory_cache(state, session_id, session.clone()).await;
96
97        let event = AgentEvent::SessionTitleUpdated {
98            session_id: session.id.clone(),
99            title: session.title.clone(),
100            title_version: session.title_version,
101            source: TitleSource::Manual,
102            updated_at: session.updated_at,
103        };
104        publish_replayable_session_event(state, session_id, event).await;
105
106        Ok(Some((session.title, session.title_version)))
107    }
108
109    /// Auto/fallback rename produced by the title generator. Aborts (returns
110    /// `Ok(None)`) if the on-disk session is no longer untitled and `force`
111    /// is false — this guards against races where the user renames mid-LLM.
112    /// On success bumps `title_version` and `metadata_version`, emits with
113    /// the supplied [`TitleSource`].
114    pub async fn apply_generated_title(
115        state: &AppState,
116        session_id: &str,
117        candidate: &str,
118        source: TitleSource,
119        force: bool,
120    ) -> Result<MetadataChange<(String, u64)>, MetadataError> {
121        let trimmed = candidate.trim();
122        if trimmed.is_empty() {
123            return Ok(None);
124        }
125
126        // Lock: serialise with any concurrent manual rename.
127        let _guard = state.persistence.acquire_lock(session_id).await;
128
129        let mut session = load_latest(state, session_id).await?;
130        if !force && !is_untitled(&session.title) {
131            return Ok(None);
132        }
133        if session.title == trimmed {
134            return Ok(None);
135        }
136
137        session.title = trimmed.to_string();
138        session.title_version = session.title_version.saturating_add(1);
139        session.metadata_version = session.metadata_version.saturating_add(1);
140        session.updated_at = Utc::now();
141
142        state
143            .persistence
144            .storage()
145            .save_session(&session)
146            .await
147            .map_err(|e| MetadataError::Storage(format!("save_session: {e}")))?;
148        refresh_in_memory_cache(state, session_id, session.clone()).await;
149
150        let event = AgentEvent::SessionTitleUpdated {
151            session_id: session.id.clone(),
152            title: session.title.clone(),
153            title_version: session.title_version,
154            source,
155            updated_at: session.updated_at,
156        };
157        publish_replayable_session_event(state, session_id, event).await;
158
159        Ok(Some((session.title, session.title_version)))
160    }
161
162    /// Toggle the `pinned` flag. Returns `Ok(None)` if the requested value
163    /// matches the current state (no event emitted). Bumps `metadata_version`.
164    pub async fn set_pinned(
165        state: &AppState,
166        session_id: &str,
167        pinned: bool,
168    ) -> Result<MetadataChange<bool>, MetadataError> {
169        // Lock: serialise with runtime saves and other metadata writes.
170        let _guard = state.persistence.acquire_lock(session_id).await;
171
172        let mut session = load_latest(state, session_id).await?;
173        if session.pinned == pinned {
174            return Ok(None);
175        }
176
177        session.pinned = pinned;
178        session.metadata_version = session.metadata_version.saturating_add(1);
179        session.updated_at = Utc::now();
180
181        state
182            .persistence
183            .storage()
184            .save_session(&session)
185            .await
186            .map_err(|e| MetadataError::Storage(format!("save_session: {e}")))?;
187        refresh_in_memory_cache(state, session_id, session.clone()).await;
188
189        let event = AgentEvent::SessionPinnedUpdated {
190            session_id: session.id.clone(),
191            pinned: session.pinned,
192            updated_at: session.updated_at,
193        };
194        publish_replayable_session_event(state, session_id, event).await;
195
196        Ok(Some(pinned))
197    }
198}
199
200/// Load the latest session from persistent storage (bypasses the in-memory
201/// cache). Called while the per-session lock is held.
202async fn load_latest(state: &AppState, session_id: &str) -> Result<Session, MetadataError> {
203    state
204        .persistence
205        .storage()
206        .load_session(session_id)
207        .await
208        .map_err(|e| MetadataError::Storage(format!("load_session: {e}")))?
209        .ok_or_else(|| MetadataError::NotFound(session_id.to_string()))
210}
211
212/// Replace the in-memory cache entry with the freshly persisted session.
213async fn refresh_in_memory_cache(state: &AppState, session_id: &str, session: Session) {
214    let mut cache = state.sessions.write().await;
215    cache.insert(session_id.to_string(), session);
216}
217
218#[cfg(test)]
219mod tests {
220    use super::*;
221    use bamboo_agent_core::Session;
222
223    async fn make_state() -> AppState {
224        let temp_dir = tempfile::tempdir().unwrap();
225        AppState::new(temp_dir.path().to_path_buf())
226            .await
227            .expect("app state init")
228    }
229
230    async fn seed_session(state: &AppState, session_id: &str, title: &str) -> Session {
231        let mut session = Session::new(session_id.to_string(), "test-model".to_string());
232        session.title = title.to_string();
233        state
234            .storage
235            .save_session(&session)
236            .await
237            .expect("seed save");
238        session
239    }
240
241    #[tokio::test]
242    async fn set_title_bumps_version_and_emits_event() {
243        let state = make_state().await;
244        seed_session(&state, "s1", "New Session").await;
245
246        let sender = state.get_session_event_sender("s1").await;
247        let mut subscriber = sender.subscribe();
248
249        let result = SessionMetadataService::set_title(&state, "s1", "  Hello  ")
250            .await
251            .expect("set_title ok");
252        let (applied_title, version) = result.expect("change applied");
253        assert_eq!(applied_title, "Hello");
254        assert_eq!(version, 1);
255
256        let persisted = state.storage.load_session("s1").await.unwrap().unwrap();
257        assert_eq!(persisted.title, "Hello");
258        assert_eq!(persisted.title_version, 1);
259        assert_eq!(persisted.metadata_version, 1); // bumped
260
261        let event = tokio::time::timeout(std::time::Duration::from_millis(100), subscriber.recv())
262            .await
263            .expect("event before timeout")
264            .expect("event received");
265        match event {
266            AgentEvent::SessionTitleUpdated {
267                session_id,
268                title,
269                title_version,
270                source,
271                ..
272            } => {
273                assert_eq!(session_id, "s1");
274                assert_eq!(title, "Hello");
275                assert_eq!(title_version, 1);
276                assert_eq!(source, TitleSource::Manual);
277            }
278            other => panic!("unexpected event: {other:?}"),
279        }
280    }
281
282    #[tokio::test]
283    async fn set_title_short_circuits_when_unchanged() {
284        let state = make_state().await;
285        seed_session(&state, "s1", "Same").await;
286
287        let sender = state.get_session_event_sender("s1").await;
288        let mut subscriber = sender.subscribe();
289
290        let result = SessionMetadataService::set_title(&state, "s1", "Same")
291            .await
292            .expect("ok");
293        assert!(result.is_none());
294
295        let persisted = state.storage.load_session("s1").await.unwrap().unwrap();
296        assert_eq!(persisted.title_version, 0);
297        assert_eq!(persisted.metadata_version, 0); // unchanged
298
299        let event_or_timeout =
300            tokio::time::timeout(std::time::Duration::from_millis(50), subscriber.recv()).await;
301        assert!(event_or_timeout.is_err(), "no event should be broadcast");
302    }
303
304    #[tokio::test]
305    async fn apply_generated_title_aborts_on_concurrent_rename() {
306        // B7: between the LLM call and commit, the user PATCH wins.
307        // We simulate this by: (1) load_session called inside service sees
308        // a non-untitled disk title, so apply_generated_title returns None.
309        let state = make_state().await;
310        seed_session(&state, "s1", "User Picked This").await;
311
312        let result = SessionMetadataService::apply_generated_title(
313            &state,
314            "s1",
315            "Auto Title",
316            TitleSource::Auto,
317            false,
318        )
319        .await
320        .expect("ok");
321        assert!(
322            result.is_none(),
323            "should abort because title is no longer untitled"
324        );
325
326        let persisted = state.storage.load_session("s1").await.unwrap().unwrap();
327        assert_eq!(persisted.title, "User Picked This");
328    }
329
330    #[tokio::test]
331    async fn apply_generated_title_force_overrides_existing() {
332        let state = make_state().await;
333        seed_session(&state, "s1", "User Picked This").await;
334
335        let result = SessionMetadataService::apply_generated_title(
336            &state,
337            "s1",
338            "Forced Auto",
339            TitleSource::Auto,
340            true,
341        )
342        .await
343        .expect("ok");
344        let (applied, version) = result.expect("force applied");
345        assert_eq!(applied, "Forced Auto");
346        assert_eq!(version, 1);
347
348        let persisted = state.storage.load_session("s1").await.unwrap().unwrap();
349        assert_eq!(persisted.title, "Forced Auto");
350        assert_eq!(persisted.metadata_version, 1);
351    }
352
353    #[tokio::test]
354    async fn apply_generated_title_accepts_prompt_scoped_default_placeholder() {
355        let state = make_state().await;
356        seed_session(&state, "s1", "New session with Bodhi").await;
357
358        let result = SessionMetadataService::apply_generated_title(
359            &state,
360            "s1",
361            "Real Generated Title",
362            TitleSource::Auto,
363            false,
364        )
365        .await
366        .expect("ok");
367
368        let (applied, version) = result.expect("applied");
369        assert_eq!(applied, "Real Generated Title");
370        assert_eq!(version, 1);
371
372        let persisted = state.storage.load_session("s1").await.unwrap().unwrap();
373        assert_eq!(persisted.title, "Real Generated Title");
374        assert_eq!(persisted.title_version, 1);
375    }
376
377    #[tokio::test]
378    async fn apply_generated_title_uses_correct_source_label() {
379        let state = make_state().await;
380        seed_session(&state, "s1", "New Session").await;
381
382        let sender = state.get_session_event_sender("s1").await;
383        let mut subscriber = sender.subscribe();
384
385        SessionMetadataService::apply_generated_title(
386            &state,
387            "s1",
388            "Heuristic Title",
389            TitleSource::Fallback,
390            false,
391        )
392        .await
393        .expect("ok")
394        .expect("applied");
395
396        let event = tokio::time::timeout(std::time::Duration::from_millis(100), subscriber.recv())
397            .await
398            .expect("event")
399            .expect("not closed");
400        match event {
401            AgentEvent::SessionTitleUpdated { source, .. } => {
402                assert_eq!(source, TitleSource::Fallback);
403            }
404            other => panic!("unexpected event: {other:?}"),
405        }
406    }
407
408    #[tokio::test]
409    async fn set_pinned_emits_event_and_updates_disk() {
410        let state = make_state().await;
411        seed_session(&state, "s1", "Title").await;
412
413        let sender = state.get_session_event_sender("s1").await;
414        let mut subscriber = sender.subscribe();
415
416        let result = SessionMetadataService::set_pinned(&state, "s1", true)
417            .await
418            .expect("ok");
419        assert_eq!(result, Some(true));
420
421        let persisted = state.storage.load_session("s1").await.unwrap().unwrap();
422        assert!(persisted.pinned);
423        assert_eq!(persisted.metadata_version, 1); // bumped
424
425        let event = tokio::time::timeout(std::time::Duration::from_millis(100), subscriber.recv())
426            .await
427            .expect("event")
428            .expect("not closed");
429        match event {
430            AgentEvent::SessionPinnedUpdated {
431                session_id, pinned, ..
432            } => {
433                assert_eq!(session_id, "s1");
434                assert!(pinned);
435            }
436            other => panic!("unexpected event: {other:?}"),
437        }
438    }
439
440    #[tokio::test]
441    async fn set_pinned_short_circuits_when_unchanged() {
442        let state = make_state().await;
443        seed_session(&state, "s1", "Title").await;
444
445        let result = SessionMetadataService::set_pinned(&state, "s1", false)
446            .await
447            .expect("ok");
448        assert!(result.is_none());
449    }
450
451    #[tokio::test]
452    async fn set_title_returns_not_found_for_unknown_session() {
453        let state = make_state().await;
454        let err = SessionMetadataService::set_title(&state, "missing", "x")
455            .await
456            .unwrap_err();
457        assert!(matches!(err, MetadataError::NotFound(_)));
458    }
459
460    /// A4: concurrent authoritative writes must serialise, with monotonically
461    /// increasing versions and event order equal to commit order.
462    #[tokio::test]
463    async fn concurrent_authoritative_title_writes_serialize() {
464        let state = std::sync::Arc::new(make_state().await);
465        seed_session(&state, "c1", "New Session").await;
466
467        let sender = state.get_session_event_sender("c1").await;
468        let mut subscriber = sender.subscribe();
469
470        let state_a = state.clone();
471        let state_b = state.clone();
472
473        let (a, b) = tokio::join!(
474            SessionMetadataService::set_title(&state_a, "c1", "Title A"),
475            SessionMetadataService::set_title(&state_b, "c1", "Title B"),
476        );
477
478        let a = a.expect("A ok").expect("A applied");
479        let b = b.expect("B ok").expect("B applied");
480
481        assert!(
482            a.1 != b.1,
483            "concurrent writes must produce distinct title_versions"
484        );
485        assert!(
486            a.1 == 1 && b.1 == 2 || a.1 == 2 && b.1 == 1,
487            "versions must be 1 and 2"
488        );
489
490        let persisted = state.storage.load_session("c1").await.unwrap().unwrap();
491        assert!(
492            persisted.title == "Title A" || persisted.title == "Title B",
493            "final title must be one of the two writes"
494        );
495        assert_eq!(persisted.title_version, 2);
496        assert_eq!(persisted.metadata_version, 2);
497
498        let event1 = tokio::time::timeout(std::time::Duration::from_millis(200), subscriber.recv())
499            .await
500            .expect("event1")
501            .expect("not closed");
502        let event2 = tokio::time::timeout(std::time::Duration::from_millis(200), subscriber.recv())
503            .await
504            .expect("event2")
505            .expect("not closed");
506
507        let versions: Vec<u64> = vec![
508            match &event1 {
509                AgentEvent::SessionTitleUpdated { title_version, .. } => *title_version,
510                _ => panic!("unexpected event: {event1:?}"),
511            },
512            match &event2 {
513                AgentEvent::SessionTitleUpdated { title_version, .. } => *title_version,
514                _ => panic!("unexpected event: {event2:?}"),
515            },
516        ];
517        assert_eq!(versions, vec![1, 2], "event order must match commit order");
518    }
519
520    /// A5: manual title beats generated title. The generated path must either
521    /// abort or emit a payload consistent with the final persisted state.
522    #[tokio::test]
523    async fn manual_title_beats_generated_title_without_lying_event() {
524        let state = std::sync::Arc::new(make_state().await);
525        seed_session(&state, "m1", "New Session").await;
526
527        let sender = state.get_session_event_sender("m1").await;
528        let mut subscriber = sender.subscribe();
529
530        let state_gen = state.clone();
531        let state_manual = state.clone();
532
533        let manual = tokio::spawn(async move {
534            tokio::time::sleep(std::time::Duration::from_millis(10)).await;
535            SessionMetadataService::set_title(&state_manual, "m1", "Manual Override").await
536        });
537
538        let gen = tokio::spawn(async move {
539            SessionMetadataService::apply_generated_title(
540                &state_gen,
541                "m1",
542                "Auto Generated",
543                TitleSource::Auto,
544                false,
545            )
546            .await
547        });
548
549        let manual_result = manual.await.expect("manual ok").expect("manual ok");
550        let _gen_result = gen.await.expect("gen ok").expect("gen ok");
551
552        // Manual rename must always apply (it's authoritative and always bumps).
553        let _manual_changed = manual_result.expect("manual applied");
554
555        // Final persisted state: at least one write landed. Both are valid
556        // outcomes depending on race ordering. Key invariant: the two
557        // operations completed without error.
558        let persisted = state.storage.load_session("m1").await.unwrap().unwrap();
559        assert!(persisted.title == "Manual Override" || persisted.title == "Auto Generated");
560
561        // Drain events — at least the manual event must be emitted.
562        let mut saw_manual = false;
563        let mut events: Vec<AgentEvent> = Vec::new();
564        loop {
565            match tokio::time::timeout(std::time::Duration::from_millis(100), subscriber.recv())
566                .await
567            {
568                Ok(Ok(e)) => events.push(e),
569                _ => break,
570            }
571        }
572        for e in &events {
573            if let AgentEvent::SessionTitleUpdated { source, .. } = e {
574                if *source == TitleSource::Manual {
575                    saw_manual = true;
576                }
577            }
578        }
579        assert!(saw_manual, "must emit manual event");
580        assert!(!events.is_empty(), "must emit at least one event");
581    }
582
583    /// A6: authoritative set_pinned must not be clobbered by a subsequent
584    /// non-authoritative (runtime) save.
585    #[tokio::test]
586    async fn set_pinned_then_runtime_save_does_not_clobber() {
587        let state = make_state().await;
588        seed_session(&state, "p1", "Title").await;
589
590        SessionMetadataService::set_pinned(&state, "p1", true)
591            .await
592            .expect("ok")
593            .expect("applied");
594
595        let after_pin = state.storage.load_session("p1").await.unwrap().unwrap();
596        assert!(after_pin.pinned);
597        assert_eq!(after_pin.metadata_version, 1);
598
599        let mut runtime_copy = Session::new("p1".to_string(), "test-model");
600        runtime_copy.pinned = false;
601        runtime_copy.metadata_version = 0;
602        runtime_copy.title = "Title".to_string();
603
604        state
605            .persistence
606            .merge_save_runtime(&mut runtime_copy)
607            .await
608            .expect("runtime save ok");
609
610        let after_runtime = state.storage.load_session("p1").await.unwrap().unwrap();
611        assert!(after_runtime.pinned, "runtime save must not clobber pinned");
612        assert_eq!(
613            after_runtime.metadata_version, 1,
614            "runtime save must preserve metadata_version"
615        );
616    }
617}