Skip to main content

bamboo_engine/session_app/metadata/
mod.rs

1//! Authoritative writer for session metadata fields (`title`, `pinned`, …).
2//!
3//! All callers that mutate session metadata MUST go through this service.
4//! Each method follows a fixed pipeline so that title/pinned writes never
5//! diverge in subtle ways (load order, version bump, save semantics, event
6//! shape):
7//!
8//! 1. Trim / validate input (fail-fast before acquiring the lock).
9//! 2. `persistence.acquire_lock(session_id)` — serialise all writes for this
10//!    session so that commit order == publish order.
11//! 3. `storage.load_session(session_id)` — pick up the latest authoritative
12//!    copy from disk (not a runner-held session that may have stale metadata).
13//! 4. Re-check preconditions inside the lock (e.g. `is_untitled` for
14//!    `apply_generated_title` when not forced; equality short-circuit for
15//!    setters that would be a no-op).
16//! 5. Mutate the field, bump `title_version` (for title) and always bump
17//!    `metadata_version`, set `updated_at`.
18//! 6. Plain `storage.save_session(&session)` — no merge needed because we
19//!    loaded the latest copy inside the lock and no other writer for this
20//!    session could have interleaved.
21//! 7. Refresh the in-memory cache (`state.sessions`).
22//! 8. Build the corresponding [`AgentEvent`] from the **final persisted
23//!    session** and publish via [`publish_replayable_session_event`].
24//!
25//! ## Authority rules
26//!
27//! - `set_title` / `apply_generated_title`: the only authoritative writers
28//!   for `title` and `title_version`. The runtime engine, scheduler, and
29//!   tool execution paths are non-authoritative and must stay on
30//!   `merge_save_session` / `merge_save_runtime` without touching `title`
31//!   directly.
32//! - `set_pinned`: the only authoritative writer for `pinned`.
33//! - Only these methods bump `metadata_version`.  Runtime paths never bump it,
34//!   so `merge_save_session` / `merge_save_runtime` can use it as a staleness
35//!   signal for the entire authoritative metadata group.
36
37use bamboo_agent_core::{AgentEvent, Session, TitleSource};
38use chrono::Utc;
39
40use crate::app_context::AgentSessionContext;
41use crate::events::publish_replayable_session_event;
42use crate::model_config_helper::GOLD_CONFIG_METADATA_KEY;
43use crate::title_gen::is_untitled;
44
45/// Errors returned by [`SessionMetadataService`].
46#[derive(Debug, thiserror::Error)]
47pub enum MetadataError {
48    #[error("session not found: {0}")]
49    NotFound(String),
50    #[error("storage error: {0}")]
51    Storage(String),
52    /// The caller's `If-Match` precondition (expected `metadata_version`) did
53    /// not match the current persisted version — a concurrent write won.
54    #[error("version conflict: expected {expected}, current {current}")]
55    VersionConflict { expected: u64, current: u64 },
56}
57
58/// Enforce an optional `If-Match` precondition against the freshly-loaded
59/// session, inside the per-session lock (so it is race-free against concurrent
60/// authoritative writes). The single `metadata_version` is the session ETag.
61fn ensure_if_match(session: &Session, if_match: Option<u64>) -> Result<(), MetadataError> {
62    if let Some(expected) = if_match {
63        if session.metadata_version != expected {
64            return Err(MetadataError::VersionConflict {
65                expected,
66                current: session.metadata_version,
67            });
68        }
69    }
70    Ok(())
71}
72
73/// Outcome of a metadata mutation.
74///
75/// `None` means the request was a no-op (the field already had the requested
76/// value, or a guard rejected the change). `Some(applied)` means the change
77/// was persisted and an event was published.
78pub type MetadataChange<T> = Option<T>;
79
80pub struct SessionMetadataService;
81
82impl SessionMetadataService {
83    /// Manual rename via PATCH. Always authoritative; always bumps
84    /// `title_version` and `metadata_version`. Returns `Ok(None)` when the
85    /// trimmed input equals the existing title (no event emitted).
86    pub async fn set_title(
87        state: &dyn AgentSessionContext,
88        session_id: &str,
89        new_title: &str,
90        if_match: Option<u64>,
91    ) -> Result<MetadataChange<(String, u64)>, MetadataError> {
92        let trimmed = new_title.trim();
93        if trimmed.is_empty() {
94            return Err(MetadataError::Storage("title cannot be empty".into()));
95        }
96
97        // Lock: serialise all writes for this session.
98        let _guard = state.persistence().acquire_lock(session_id).await;
99
100        let mut session = load_latest(state, session_id).await?;
101        ensure_if_match(&session, if_match)?;
102        if session.title == trimmed {
103            return Ok(None);
104        }
105
106        session.title = trimmed.to_string();
107        session.title_version = session.title_version.saturating_add(1);
108        session.metadata_version = session.metadata_version.saturating_add(1);
109        session.updated_at = Utc::now();
110
111        state
112            .persistence()
113            .storage()
114            .save_session(&session)
115            .await
116            .map_err(|e| MetadataError::Storage(format!("save_session: {e}")))?;
117        refresh_in_memory_cache(state, session_id, session.clone()).await;
118
119        let event = AgentEvent::SessionTitleUpdated {
120            session_id: session.id.clone(),
121            title: session.title.clone(),
122            title_version: session.title_version,
123            source: TitleSource::Manual,
124            updated_at: session.updated_at,
125        };
126        publish_replayable_session_event(state, session_id, event).await;
127
128        Ok(Some((session.title, session.title_version)))
129    }
130
131    /// Auto/fallback rename produced by the title generator. Aborts (returns
132    /// `Ok(None)`) if the on-disk session is no longer untitled and `force`
133    /// is false — this guards against races where the user renames mid-LLM.
134    /// On success bumps `title_version` and `metadata_version`, emits with
135    /// the supplied [`TitleSource`].
136    pub async fn apply_generated_title(
137        state: &dyn AgentSessionContext,
138        session_id: &str,
139        candidate: &str,
140        source: TitleSource,
141        force: bool,
142    ) -> Result<MetadataChange<(String, u64)>, MetadataError> {
143        let trimmed = candidate.trim();
144        if trimmed.is_empty() {
145            return Ok(None);
146        }
147
148        // Lock: serialise with any concurrent manual rename.
149        let _guard = state.persistence().acquire_lock(session_id).await;
150
151        let mut session = load_latest(state, session_id).await?;
152        if !force && !is_untitled(&session.title) {
153            return Ok(None);
154        }
155        if session.title == trimmed {
156            return Ok(None);
157        }
158
159        session.title = trimmed.to_string();
160        session.title_version = session.title_version.saturating_add(1);
161        session.metadata_version = session.metadata_version.saturating_add(1);
162        session.updated_at = Utc::now();
163
164        state
165            .persistence()
166            .storage()
167            .save_session(&session)
168            .await
169            .map_err(|e| MetadataError::Storage(format!("save_session: {e}")))?;
170        refresh_in_memory_cache(state, session_id, session.clone()).await;
171
172        let event = AgentEvent::SessionTitleUpdated {
173            session_id: session.id.clone(),
174            title: session.title.clone(),
175            title_version: session.title_version,
176            source,
177            updated_at: session.updated_at,
178        };
179        publish_replayable_session_event(state, session_id, event).await;
180
181        Ok(Some((session.title, session.title_version)))
182    }
183
184    /// Toggle the `pinned` flag. Returns `Ok(None)` if the requested value
185    /// matches the current state (no event emitted). Bumps `metadata_version`.
186    pub async fn set_pinned(
187        state: &dyn AgentSessionContext,
188        session_id: &str,
189        pinned: bool,
190        if_match: Option<u64>,
191    ) -> Result<MetadataChange<bool>, MetadataError> {
192        // Lock: serialise with runtime saves and other metadata writes.
193        let _guard = state.persistence().acquire_lock(session_id).await;
194
195        let mut session = load_latest(state, session_id).await?;
196        ensure_if_match(&session, if_match)?;
197        if session.pinned == pinned {
198            return Ok(None);
199        }
200
201        session.pinned = pinned;
202        session.metadata_version = session.metadata_version.saturating_add(1);
203        session.updated_at = Utc::now();
204
205        state
206            .persistence()
207            .storage()
208            .save_session(&session)
209            .await
210            .map_err(|e| MetadataError::Storage(format!("save_session: {e}")))?;
211        refresh_in_memory_cache(state, session_id, session.clone()).await;
212
213        let event = AgentEvent::SessionPinnedUpdated {
214            session_id: session.id.clone(),
215            pinned: session.pinned,
216            updated_at: session.updated_at,
217        };
218        publish_replayable_session_event(state, session_id, event).await;
219
220        Ok(Some(pinned))
221    }
222
223    /// Set or clear the session-level Gold configuration JSON.
224    ///
225    /// This is an authoritative session metadata write: it bumps
226    /// `metadata_version` so runtime saves with stale session structs do not
227    /// overwrite the user's current-session Gold settings.
228    pub async fn set_gold_config_json(
229        state: &dyn AgentSessionContext,
230        session_id: &str,
231        gold_config_json: Option<String>,
232        if_match: Option<u64>,
233    ) -> Result<MetadataChange<Option<String>>, MetadataError> {
234        let normalized = gold_config_json.and_then(|value| {
235            let trimmed = value.trim();
236            if trimmed.is_empty() {
237                None
238            } else {
239                Some(trimmed.to_string())
240            }
241        });
242
243        let _guard = state.persistence().acquire_lock(session_id).await;
244        let mut session = load_latest(state, session_id).await?;
245        ensure_if_match(&session, if_match)?;
246        let current = session
247            .metadata
248            .get(GOLD_CONFIG_METADATA_KEY)
249            .map(|value| value.trim().to_string())
250            .filter(|value| !value.is_empty());
251        if current == normalized {
252            return Ok(None);
253        }
254
255        if let Some(value) = normalized.as_ref() {
256            session
257                .metadata
258                .insert(GOLD_CONFIG_METADATA_KEY.to_string(), value.clone());
259        } else {
260            session.metadata.remove(GOLD_CONFIG_METADATA_KEY);
261        }
262        session.metadata_version = session.metadata_version.saturating_add(1);
263        session.updated_at = Utc::now();
264
265        state
266            .persistence()
267            .storage()
268            .save_session(&session)
269            .await
270            .map_err(|e| MetadataError::Storage(format!("save_session: {e}")))?;
271        refresh_in_memory_cache(state, session_id, session).await;
272
273        Ok(Some(normalized))
274    }
275}
276
277/// Load the latest session from persistent storage (bypasses the in-memory
278/// cache). Called while the per-session lock is held.
279async fn load_latest(state: &dyn AgentSessionContext, session_id: &str) -> Result<Session, MetadataError> {
280    state
281        .persistence()
282        .storage()
283        .load_session(session_id)
284        .await
285        .map_err(|e| MetadataError::Storage(format!("load_session: {e}")))?
286        .ok_or_else(|| MetadataError::NotFound(session_id.to_string()))
287}
288
289/// Replace the in-memory cache entry with the freshly persisted session.
290async fn refresh_in_memory_cache(state: &dyn AgentSessionContext, session_id: &str, session: Session) {
291    let mut cache = state.sessions().write().await;
292    cache.insert(session_id.to_string(), session);
293}