bamboo_engine/session_app/metadata/
mod.rs1use bamboo_agent_core::{AgentEvent, Session, TitleSource};
38use chrono::Utc;
39
40use crate::app_context::AgentSessionContext;
41use crate::events::publish_replayable_session_event;
42use crate::model_config_helper::GOLD_CONFIG_METADATA_KEY;
43use crate::title_gen::is_untitled;
44
45#[derive(Debug, thiserror::Error)]
47pub enum MetadataError {
48 #[error("session not found: {0}")]
49 NotFound(String),
50 #[error("storage error: {0}")]
51 Storage(String),
52 #[error("version conflict: expected {expected}, current {current}")]
55 VersionConflict { expected: u64, current: u64 },
56}
57
58fn ensure_if_match(session: &Session, if_match: Option<u64>) -> Result<(), MetadataError> {
62 if let Some(expected) = if_match {
63 if session.metadata_version != expected {
64 return Err(MetadataError::VersionConflict {
65 expected,
66 current: session.metadata_version,
67 });
68 }
69 }
70 Ok(())
71}
72
73pub type MetadataChange<T> = Option<T>;
79
80pub struct SessionMetadataService;
81
82impl SessionMetadataService {
83 pub async fn set_title(
87 state: &dyn AgentSessionContext,
88 session_id: &str,
89 new_title: &str,
90 if_match: Option<u64>,
91 ) -> Result<MetadataChange<(String, u64)>, MetadataError> {
92 let trimmed = new_title.trim();
93 if trimmed.is_empty() {
94 return Err(MetadataError::Storage("title cannot be empty".into()));
95 }
96
97 let _guard = state.persistence().acquire_lock(session_id).await;
99
100 let mut session = load_latest(state, session_id).await?;
101 ensure_if_match(&session, if_match)?;
102 if session.title == trimmed {
103 return Ok(None);
104 }
105
106 session.title = trimmed.to_string();
107 session.title_version = session.title_version.saturating_add(1);
108 session.metadata_version = session.metadata_version.saturating_add(1);
109 session.updated_at = Utc::now();
110
111 state
112 .persistence()
113 .storage()
114 .save_session(&session)
115 .await
116 .map_err(|e| MetadataError::Storage(format!("save_session: {e}")))?;
117 refresh_in_memory_cache(state, session_id, session.clone()).await;
118
119 let event = AgentEvent::SessionTitleUpdated {
120 session_id: session.id.clone(),
121 title: session.title.clone(),
122 title_version: session.title_version,
123 source: TitleSource::Manual,
124 updated_at: session.updated_at,
125 };
126 publish_replayable_session_event(state, session_id, event).await;
127
128 Ok(Some((session.title, session.title_version)))
129 }
130
131 pub async fn apply_generated_title(
137 state: &dyn AgentSessionContext,
138 session_id: &str,
139 candidate: &str,
140 source: TitleSource,
141 force: bool,
142 ) -> Result<MetadataChange<(String, u64)>, MetadataError> {
143 let trimmed = candidate.trim();
144 if trimmed.is_empty() {
145 return Ok(None);
146 }
147
148 let _guard = state.persistence().acquire_lock(session_id).await;
150
151 let mut session = load_latest(state, session_id).await?;
152 if !force && !is_untitled(&session.title) {
153 return Ok(None);
154 }
155 if session.title == trimmed {
156 return Ok(None);
157 }
158
159 session.title = trimmed.to_string();
160 session.title_version = session.title_version.saturating_add(1);
161 session.metadata_version = session.metadata_version.saturating_add(1);
162 session.updated_at = Utc::now();
163
164 state
165 .persistence()
166 .storage()
167 .save_session(&session)
168 .await
169 .map_err(|e| MetadataError::Storage(format!("save_session: {e}")))?;
170 refresh_in_memory_cache(state, session_id, session.clone()).await;
171
172 let event = AgentEvent::SessionTitleUpdated {
173 session_id: session.id.clone(),
174 title: session.title.clone(),
175 title_version: session.title_version,
176 source,
177 updated_at: session.updated_at,
178 };
179 publish_replayable_session_event(state, session_id, event).await;
180
181 Ok(Some((session.title, session.title_version)))
182 }
183
184 pub async fn set_pinned(
187 state: &dyn AgentSessionContext,
188 session_id: &str,
189 pinned: bool,
190 if_match: Option<u64>,
191 ) -> Result<MetadataChange<bool>, MetadataError> {
192 let _guard = state.persistence().acquire_lock(session_id).await;
194
195 let mut session = load_latest(state, session_id).await?;
196 ensure_if_match(&session, if_match)?;
197 if session.pinned == pinned {
198 return Ok(None);
199 }
200
201 session.pinned = pinned;
202 session.metadata_version = session.metadata_version.saturating_add(1);
203 session.updated_at = Utc::now();
204
205 state
206 .persistence()
207 .storage()
208 .save_session(&session)
209 .await
210 .map_err(|e| MetadataError::Storage(format!("save_session: {e}")))?;
211 refresh_in_memory_cache(state, session_id, session.clone()).await;
212
213 let event = AgentEvent::SessionPinnedUpdated {
214 session_id: session.id.clone(),
215 pinned: session.pinned,
216 updated_at: session.updated_at,
217 };
218 publish_replayable_session_event(state, session_id, event).await;
219
220 Ok(Some(pinned))
221 }
222
223 pub async fn set_gold_config_json(
229 state: &dyn AgentSessionContext,
230 session_id: &str,
231 gold_config_json: Option<String>,
232 if_match: Option<u64>,
233 ) -> Result<MetadataChange<Option<String>>, MetadataError> {
234 let normalized = gold_config_json.and_then(|value| {
235 let trimmed = value.trim();
236 if trimmed.is_empty() {
237 None
238 } else {
239 Some(trimmed.to_string())
240 }
241 });
242
243 let _guard = state.persistence().acquire_lock(session_id).await;
244 let mut session = load_latest(state, session_id).await?;
245 ensure_if_match(&session, if_match)?;
246 let current = session
247 .metadata
248 .get(GOLD_CONFIG_METADATA_KEY)
249 .map(|value| value.trim().to_string())
250 .filter(|value| !value.is_empty());
251 if current == normalized {
252 return Ok(None);
253 }
254
255 if let Some(value) = normalized.as_ref() {
256 session
257 .metadata
258 .insert(GOLD_CONFIG_METADATA_KEY.to_string(), value.clone());
259 } else {
260 session.metadata.remove(GOLD_CONFIG_METADATA_KEY);
261 }
262 session.metadata_version = session.metadata_version.saturating_add(1);
263 session.updated_at = Utc::now();
264
265 state
266 .persistence()
267 .storage()
268 .save_session(&session)
269 .await
270 .map_err(|e| MetadataError::Storage(format!("save_session: {e}")))?;
271 refresh_in_memory_cache(state, session_id, session).await;
272
273 Ok(Some(normalized))
274 }
275}
276
277async fn load_latest(
280 state: &dyn AgentSessionContext,
281 session_id: &str,
282) -> Result<Session, MetadataError> {
283 state
284 .persistence()
285 .storage()
286 .load_session(session_id)
287 .await
288 .map_err(|e| MetadataError::Storage(format!("load_session: {e}")))?
289 .ok_or_else(|| MetadataError::NotFound(session_id.to_string()))
290}
291
292async fn refresh_in_memory_cache(
294 state: &dyn AgentSessionContext,
295 session_id: &str,
296 session: Session,
297) {
298 let mut cache = state.sessions().write().await;
299 cache.insert(session_id.to_string(), session);
300}