1use bamboo_agent_core::{AgentEvent, Session, TitleSource};
38use chrono::Utc;
39
40use crate::app_state::AppState;
41use crate::events::publish_replayable_session_event;
42use crate::title_gen::is_untitled;
43
44#[derive(Debug, thiserror::Error)]
46pub enum MetadataError {
47 #[error("session not found: {0}")]
48 NotFound(String),
49 #[error("storage error: {0}")]
50 Storage(String),
51}
52
53pub type MetadataChange<T> = Option<T>;
59
60pub struct SessionMetadataService;
61
62impl SessionMetadataService {
63 pub async fn set_title(
67 state: &AppState,
68 session_id: &str,
69 new_title: &str,
70 ) -> Result<MetadataChange<(String, u64)>, MetadataError> {
71 let trimmed = new_title.trim();
72 if trimmed.is_empty() {
73 return Err(MetadataError::Storage("title cannot be empty".into()));
74 }
75
76 let _guard = state.persistence.acquire_lock(session_id).await;
78
79 let mut session = load_latest(state, session_id).await?;
80 if session.title == trimmed {
81 return Ok(None);
82 }
83
84 session.title = trimmed.to_string();
85 session.title_version = session.title_version.saturating_add(1);
86 session.metadata_version = session.metadata_version.saturating_add(1);
87 session.updated_at = Utc::now();
88
89 state
90 .persistence
91 .storage()
92 .save_session(&session)
93 .await
94 .map_err(|e| MetadataError::Storage(format!("save_session: {e}")))?;
95 refresh_in_memory_cache(state, session_id, session.clone()).await;
96
97 let event = AgentEvent::SessionTitleUpdated {
98 session_id: session.id.clone(),
99 title: session.title.clone(),
100 title_version: session.title_version,
101 source: TitleSource::Manual,
102 updated_at: session.updated_at,
103 };
104 publish_replayable_session_event(state, session_id, event).await;
105
106 Ok(Some((session.title, session.title_version)))
107 }
108
109 pub async fn apply_generated_title(
115 state: &AppState,
116 session_id: &str,
117 candidate: &str,
118 source: TitleSource,
119 force: bool,
120 ) -> Result<MetadataChange<(String, u64)>, MetadataError> {
121 let trimmed = candidate.trim();
122 if trimmed.is_empty() {
123 return Ok(None);
124 }
125
126 let _guard = state.persistence.acquire_lock(session_id).await;
128
129 let mut session = load_latest(state, session_id).await?;
130 if !force && !is_untitled(&session.title) {
131 return Ok(None);
132 }
133 if session.title == trimmed {
134 return Ok(None);
135 }
136
137 session.title = trimmed.to_string();
138 session.title_version = session.title_version.saturating_add(1);
139 session.metadata_version = session.metadata_version.saturating_add(1);
140 session.updated_at = Utc::now();
141
142 state
143 .persistence
144 .storage()
145 .save_session(&session)
146 .await
147 .map_err(|e| MetadataError::Storage(format!("save_session: {e}")))?;
148 refresh_in_memory_cache(state, session_id, session.clone()).await;
149
150 let event = AgentEvent::SessionTitleUpdated {
151 session_id: session.id.clone(),
152 title: session.title.clone(),
153 title_version: session.title_version,
154 source,
155 updated_at: session.updated_at,
156 };
157 publish_replayable_session_event(state, session_id, event).await;
158
159 Ok(Some((session.title, session.title_version)))
160 }
161
162 pub async fn set_pinned(
165 state: &AppState,
166 session_id: &str,
167 pinned: bool,
168 ) -> Result<MetadataChange<bool>, MetadataError> {
169 let _guard = state.persistence.acquire_lock(session_id).await;
171
172 let mut session = load_latest(state, session_id).await?;
173 if session.pinned == pinned {
174 return Ok(None);
175 }
176
177 session.pinned = pinned;
178 session.metadata_version = session.metadata_version.saturating_add(1);
179 session.updated_at = Utc::now();
180
181 state
182 .persistence
183 .storage()
184 .save_session(&session)
185 .await
186 .map_err(|e| MetadataError::Storage(format!("save_session: {e}")))?;
187 refresh_in_memory_cache(state, session_id, session.clone()).await;
188
189 let event = AgentEvent::SessionPinnedUpdated {
190 session_id: session.id.clone(),
191 pinned: session.pinned,
192 updated_at: session.updated_at,
193 };
194 publish_replayable_session_event(state, session_id, event).await;
195
196 Ok(Some(pinned))
197 }
198}
199
200async fn load_latest(state: &AppState, session_id: &str) -> Result<Session, MetadataError> {
203 state
204 .persistence
205 .storage()
206 .load_session(session_id)
207 .await
208 .map_err(|e| MetadataError::Storage(format!("load_session: {e}")))?
209 .ok_or_else(|| MetadataError::NotFound(session_id.to_string()))
210}
211
212async fn refresh_in_memory_cache(state: &AppState, session_id: &str, session: Session) {
214 let mut cache = state.sessions.write().await;
215 cache.insert(session_id.to_string(), session);
216}
217
218#[cfg(test)]
219mod tests {
220 use super::*;
221 use bamboo_agent_core::Session;
222
223 async fn make_state() -> AppState {
224 let temp_dir = tempfile::tempdir().unwrap();
225 AppState::new(temp_dir.path().to_path_buf())
226 .await
227 .expect("app state init")
228 }
229
230 async fn seed_session(state: &AppState, session_id: &str, title: &str) -> Session {
231 let mut session = Session::new(session_id.to_string(), "test-model".to_string());
232 session.title = title.to_string();
233 state
234 .storage
235 .save_session(&session)
236 .await
237 .expect("seed save");
238 session
239 }
240
241 #[tokio::test]
242 async fn set_title_bumps_version_and_emits_event() {
243 let state = make_state().await;
244 seed_session(&state, "s1", "New Session").await;
245
246 let sender = state.get_session_event_sender("s1").await;
247 let mut subscriber = sender.subscribe();
248
249 let result = SessionMetadataService::set_title(&state, "s1", " Hello ")
250 .await
251 .expect("set_title ok");
252 let (applied_title, version) = result.expect("change applied");
253 assert_eq!(applied_title, "Hello");
254 assert_eq!(version, 1);
255
256 let persisted = state.storage.load_session("s1").await.unwrap().unwrap();
257 assert_eq!(persisted.title, "Hello");
258 assert_eq!(persisted.title_version, 1);
259 assert_eq!(persisted.metadata_version, 1); let event = tokio::time::timeout(std::time::Duration::from_millis(100), subscriber.recv())
262 .await
263 .expect("event before timeout")
264 .expect("event received");
265 match event {
266 AgentEvent::SessionTitleUpdated {
267 session_id,
268 title,
269 title_version,
270 source,
271 ..
272 } => {
273 assert_eq!(session_id, "s1");
274 assert_eq!(title, "Hello");
275 assert_eq!(title_version, 1);
276 assert_eq!(source, TitleSource::Manual);
277 }
278 other => panic!("unexpected event: {other:?}"),
279 }
280 }
281
282 #[tokio::test]
283 async fn set_title_short_circuits_when_unchanged() {
284 let state = make_state().await;
285 seed_session(&state, "s1", "Same").await;
286
287 let sender = state.get_session_event_sender("s1").await;
288 let mut subscriber = sender.subscribe();
289
290 let result = SessionMetadataService::set_title(&state, "s1", "Same")
291 .await
292 .expect("ok");
293 assert!(result.is_none());
294
295 let persisted = state.storage.load_session("s1").await.unwrap().unwrap();
296 assert_eq!(persisted.title_version, 0);
297 assert_eq!(persisted.metadata_version, 0); let event_or_timeout =
300 tokio::time::timeout(std::time::Duration::from_millis(50), subscriber.recv()).await;
301 assert!(event_or_timeout.is_err(), "no event should be broadcast");
302 }
303
304 #[tokio::test]
305 async fn apply_generated_title_aborts_on_concurrent_rename() {
306 let state = make_state().await;
310 seed_session(&state, "s1", "User Picked This").await;
311
312 let result = SessionMetadataService::apply_generated_title(
313 &state,
314 "s1",
315 "Auto Title",
316 TitleSource::Auto,
317 false,
318 )
319 .await
320 .expect("ok");
321 assert!(
322 result.is_none(),
323 "should abort because title is no longer untitled"
324 );
325
326 let persisted = state.storage.load_session("s1").await.unwrap().unwrap();
327 assert_eq!(persisted.title, "User Picked This");
328 }
329
330 #[tokio::test]
331 async fn apply_generated_title_force_overrides_existing() {
332 let state = make_state().await;
333 seed_session(&state, "s1", "User Picked This").await;
334
335 let result = SessionMetadataService::apply_generated_title(
336 &state,
337 "s1",
338 "Forced Auto",
339 TitleSource::Auto,
340 true,
341 )
342 .await
343 .expect("ok");
344 let (applied, version) = result.expect("force applied");
345 assert_eq!(applied, "Forced Auto");
346 assert_eq!(version, 1);
347
348 let persisted = state.storage.load_session("s1").await.unwrap().unwrap();
349 assert_eq!(persisted.title, "Forced Auto");
350 assert_eq!(persisted.metadata_version, 1);
351 }
352
353 #[tokio::test]
354 async fn apply_generated_title_accepts_prompt_scoped_default_placeholder() {
355 let state = make_state().await;
356 seed_session(&state, "s1", "New session with Bodhi").await;
357
358 let result = SessionMetadataService::apply_generated_title(
359 &state,
360 "s1",
361 "Real Generated Title",
362 TitleSource::Auto,
363 false,
364 )
365 .await
366 .expect("ok");
367
368 let (applied, version) = result.expect("applied");
369 assert_eq!(applied, "Real Generated Title");
370 assert_eq!(version, 1);
371
372 let persisted = state.storage.load_session("s1").await.unwrap().unwrap();
373 assert_eq!(persisted.title, "Real Generated Title");
374 assert_eq!(persisted.title_version, 1);
375 }
376
377 #[tokio::test]
378 async fn apply_generated_title_uses_correct_source_label() {
379 let state = make_state().await;
380 seed_session(&state, "s1", "New Session").await;
381
382 let sender = state.get_session_event_sender("s1").await;
383 let mut subscriber = sender.subscribe();
384
385 SessionMetadataService::apply_generated_title(
386 &state,
387 "s1",
388 "Heuristic Title",
389 TitleSource::Fallback,
390 false,
391 )
392 .await
393 .expect("ok")
394 .expect("applied");
395
396 let event = tokio::time::timeout(std::time::Duration::from_millis(100), subscriber.recv())
397 .await
398 .expect("event")
399 .expect("not closed");
400 match event {
401 AgentEvent::SessionTitleUpdated { source, .. } => {
402 assert_eq!(source, TitleSource::Fallback);
403 }
404 other => panic!("unexpected event: {other:?}"),
405 }
406 }
407
408 #[tokio::test]
409 async fn set_pinned_emits_event_and_updates_disk() {
410 let state = make_state().await;
411 seed_session(&state, "s1", "Title").await;
412
413 let sender = state.get_session_event_sender("s1").await;
414 let mut subscriber = sender.subscribe();
415
416 let result = SessionMetadataService::set_pinned(&state, "s1", true)
417 .await
418 .expect("ok");
419 assert_eq!(result, Some(true));
420
421 let persisted = state.storage.load_session("s1").await.unwrap().unwrap();
422 assert!(persisted.pinned);
423 assert_eq!(persisted.metadata_version, 1); let event = tokio::time::timeout(std::time::Duration::from_millis(100), subscriber.recv())
426 .await
427 .expect("event")
428 .expect("not closed");
429 match event {
430 AgentEvent::SessionPinnedUpdated {
431 session_id, pinned, ..
432 } => {
433 assert_eq!(session_id, "s1");
434 assert!(pinned);
435 }
436 other => panic!("unexpected event: {other:?}"),
437 }
438 }
439
440 #[tokio::test]
441 async fn set_pinned_short_circuits_when_unchanged() {
442 let state = make_state().await;
443 seed_session(&state, "s1", "Title").await;
444
445 let result = SessionMetadataService::set_pinned(&state, "s1", false)
446 .await
447 .expect("ok");
448 assert!(result.is_none());
449 }
450
451 #[tokio::test]
452 async fn set_title_returns_not_found_for_unknown_session() {
453 let state = make_state().await;
454 let err = SessionMetadataService::set_title(&state, "missing", "x")
455 .await
456 .unwrap_err();
457 assert!(matches!(err, MetadataError::NotFound(_)));
458 }
459
460 #[tokio::test]
463 async fn concurrent_authoritative_title_writes_serialize() {
464 let state = std::sync::Arc::new(make_state().await);
465 seed_session(&state, "c1", "New Session").await;
466
467 let sender = state.get_session_event_sender("c1").await;
468 let mut subscriber = sender.subscribe();
469
470 let state_a = state.clone();
471 let state_b = state.clone();
472
473 let (a, b) = tokio::join!(
474 SessionMetadataService::set_title(&state_a, "c1", "Title A"),
475 SessionMetadataService::set_title(&state_b, "c1", "Title B"),
476 );
477
478 let a = a.expect("A ok").expect("A applied");
479 let b = b.expect("B ok").expect("B applied");
480
481 assert!(
482 a.1 != b.1,
483 "concurrent writes must produce distinct title_versions"
484 );
485 assert!(
486 a.1 == 1 && b.1 == 2 || a.1 == 2 && b.1 == 1,
487 "versions must be 1 and 2"
488 );
489
490 let persisted = state.storage.load_session("c1").await.unwrap().unwrap();
491 assert!(
492 persisted.title == "Title A" || persisted.title == "Title B",
493 "final title must be one of the two writes"
494 );
495 assert_eq!(persisted.title_version, 2);
496 assert_eq!(persisted.metadata_version, 2);
497
498 let event1 = tokio::time::timeout(std::time::Duration::from_millis(200), subscriber.recv())
499 .await
500 .expect("event1")
501 .expect("not closed");
502 let event2 = tokio::time::timeout(std::time::Duration::from_millis(200), subscriber.recv())
503 .await
504 .expect("event2")
505 .expect("not closed");
506
507 let versions: Vec<u64> = vec![
508 match &event1 {
509 AgentEvent::SessionTitleUpdated { title_version, .. } => *title_version,
510 _ => panic!("unexpected event: {event1:?}"),
511 },
512 match &event2 {
513 AgentEvent::SessionTitleUpdated { title_version, .. } => *title_version,
514 _ => panic!("unexpected event: {event2:?}"),
515 },
516 ];
517 assert_eq!(versions, vec![1, 2], "event order must match commit order");
518 }
519
520 #[tokio::test]
523 async fn manual_title_beats_generated_title_without_lying_event() {
524 let state = std::sync::Arc::new(make_state().await);
525 seed_session(&state, "m1", "New Session").await;
526
527 let sender = state.get_session_event_sender("m1").await;
528 let mut subscriber = sender.subscribe();
529
530 let state_gen = state.clone();
531 let state_manual = state.clone();
532
533 let manual = tokio::spawn(async move {
534 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
535 SessionMetadataService::set_title(&state_manual, "m1", "Manual Override").await
536 });
537
538 let gen = tokio::spawn(async move {
539 SessionMetadataService::apply_generated_title(
540 &state_gen,
541 "m1",
542 "Auto Generated",
543 TitleSource::Auto,
544 false,
545 )
546 .await
547 });
548
549 let manual_result = manual.await.expect("manual ok").expect("manual ok");
550 let _gen_result = gen.await.expect("gen ok").expect("gen ok");
551
552 let _manual_changed = manual_result.expect("manual applied");
554
555 let persisted = state.storage.load_session("m1").await.unwrap().unwrap();
559 assert!(persisted.title == "Manual Override" || persisted.title == "Auto Generated");
560
561 let mut saw_manual = false;
563 let mut events: Vec<AgentEvent> = Vec::new();
564 loop {
565 match tokio::time::timeout(std::time::Duration::from_millis(100), subscriber.recv())
566 .await
567 {
568 Ok(Ok(e)) => events.push(e),
569 _ => break,
570 }
571 }
572 for e in &events {
573 if let AgentEvent::SessionTitleUpdated { source, .. } = e {
574 if *source == TitleSource::Manual {
575 saw_manual = true;
576 }
577 }
578 }
579 assert!(saw_manual, "must emit manual event");
580 assert!(!events.is_empty(), "must emit at least one event");
581 }
582
583 #[tokio::test]
586 async fn set_pinned_then_runtime_save_does_not_clobber() {
587 let state = make_state().await;
588 seed_session(&state, "p1", "Title").await;
589
590 SessionMetadataService::set_pinned(&state, "p1", true)
591 .await
592 .expect("ok")
593 .expect("applied");
594
595 let after_pin = state.storage.load_session("p1").await.unwrap().unwrap();
596 assert!(after_pin.pinned);
597 assert_eq!(after_pin.metadata_version, 1);
598
599 let mut runtime_copy = Session::new("p1".to_string(), "test-model");
600 runtime_copy.pinned = false;
601 runtime_copy.metadata_version = 0;
602 runtime_copy.title = "Title".to_string();
603
604 state
605 .persistence
606 .merge_save_runtime(&mut runtime_copy)
607 .await
608 .expect("runtime save ok");
609
610 let after_runtime = state.storage.load_session("p1").await.unwrap().unwrap();
611 assert!(after_runtime.pinned, "runtime save must not clobber pinned");
612 assert_eq!(
613 after_runtime.metadata_version, 1,
614 "runtime save must preserve metadata_version"
615 );
616 }
617}