1use bamboo_agent_core::{AgentEvent, Session, TitleSource};
38use chrono::Utc;
39
40use crate::app_state::AppState;
41use crate::events::publish_replayable_session_event;
42use crate::model_config_helper::GOLD_CONFIG_METADATA_KEY;
43use crate::title_gen::is_untitled;
44
45#[derive(Debug, thiserror::Error)]
47pub enum MetadataError {
48 #[error("session not found: {0}")]
49 NotFound(String),
50 #[error("storage error: {0}")]
51 Storage(String),
52 #[error("version conflict: expected {expected}, current {current}")]
55 VersionConflict { expected: u64, current: u64 },
56}
57
58fn ensure_if_match(session: &Session, if_match: Option<u64>) -> Result<(), MetadataError> {
62 if let Some(expected) = if_match {
63 if session.metadata_version != expected {
64 return Err(MetadataError::VersionConflict {
65 expected,
66 current: session.metadata_version,
67 });
68 }
69 }
70 Ok(())
71}
72
73pub type MetadataChange<T> = Option<T>;
79
80pub struct SessionMetadataService;
81
82impl SessionMetadataService {
83 pub async fn set_title(
87 state: &AppState,
88 session_id: &str,
89 new_title: &str,
90 if_match: Option<u64>,
91 ) -> Result<MetadataChange<(String, u64)>, MetadataError> {
92 let trimmed = new_title.trim();
93 if trimmed.is_empty() {
94 return Err(MetadataError::Storage("title cannot be empty".into()));
95 }
96
97 let _guard = state.persistence.acquire_lock(session_id).await;
99
100 let mut session = load_latest(state, session_id).await?;
101 ensure_if_match(&session, if_match)?;
102 if session.title == trimmed {
103 return Ok(None);
104 }
105
106 session.title = trimmed.to_string();
107 session.title_version = session.title_version.saturating_add(1);
108 session.metadata_version = session.metadata_version.saturating_add(1);
109 session.updated_at = Utc::now();
110
111 state
112 .persistence
113 .storage()
114 .save_session(&session)
115 .await
116 .map_err(|e| MetadataError::Storage(format!("save_session: {e}")))?;
117 refresh_in_memory_cache(state, session_id, session.clone()).await;
118
119 let event = AgentEvent::SessionTitleUpdated {
120 session_id: session.id.clone(),
121 title: session.title.clone(),
122 title_version: session.title_version,
123 source: TitleSource::Manual,
124 updated_at: session.updated_at,
125 };
126 publish_replayable_session_event(state, session_id, event).await;
127
128 Ok(Some((session.title, session.title_version)))
129 }
130
131 pub async fn apply_generated_title(
137 state: &AppState,
138 session_id: &str,
139 candidate: &str,
140 source: TitleSource,
141 force: bool,
142 ) -> Result<MetadataChange<(String, u64)>, MetadataError> {
143 let trimmed = candidate.trim();
144 if trimmed.is_empty() {
145 return Ok(None);
146 }
147
148 let _guard = state.persistence.acquire_lock(session_id).await;
150
151 let mut session = load_latest(state, session_id).await?;
152 if !force && !is_untitled(&session.title) {
153 return Ok(None);
154 }
155 if session.title == trimmed {
156 return Ok(None);
157 }
158
159 session.title = trimmed.to_string();
160 session.title_version = session.title_version.saturating_add(1);
161 session.metadata_version = session.metadata_version.saturating_add(1);
162 session.updated_at = Utc::now();
163
164 state
165 .persistence
166 .storage()
167 .save_session(&session)
168 .await
169 .map_err(|e| MetadataError::Storage(format!("save_session: {e}")))?;
170 refresh_in_memory_cache(state, session_id, session.clone()).await;
171
172 let event = AgentEvent::SessionTitleUpdated {
173 session_id: session.id.clone(),
174 title: session.title.clone(),
175 title_version: session.title_version,
176 source,
177 updated_at: session.updated_at,
178 };
179 publish_replayable_session_event(state, session_id, event).await;
180
181 Ok(Some((session.title, session.title_version)))
182 }
183
184 pub async fn set_pinned(
187 state: &AppState,
188 session_id: &str,
189 pinned: bool,
190 if_match: Option<u64>,
191 ) -> Result<MetadataChange<bool>, MetadataError> {
192 let _guard = state.persistence.acquire_lock(session_id).await;
194
195 let mut session = load_latest(state, session_id).await?;
196 ensure_if_match(&session, if_match)?;
197 if session.pinned == pinned {
198 return Ok(None);
199 }
200
201 session.pinned = pinned;
202 session.metadata_version = session.metadata_version.saturating_add(1);
203 session.updated_at = Utc::now();
204
205 state
206 .persistence
207 .storage()
208 .save_session(&session)
209 .await
210 .map_err(|e| MetadataError::Storage(format!("save_session: {e}")))?;
211 refresh_in_memory_cache(state, session_id, session.clone()).await;
212
213 let event = AgentEvent::SessionPinnedUpdated {
214 session_id: session.id.clone(),
215 pinned: session.pinned,
216 updated_at: session.updated_at,
217 };
218 publish_replayable_session_event(state, session_id, event).await;
219
220 Ok(Some(pinned))
221 }
222
223 pub async fn set_gold_config_json(
229 state: &AppState,
230 session_id: &str,
231 gold_config_json: Option<String>,
232 if_match: Option<u64>,
233 ) -> Result<MetadataChange<Option<String>>, MetadataError> {
234 let normalized = gold_config_json.and_then(|value| {
235 let trimmed = value.trim();
236 if trimmed.is_empty() {
237 None
238 } else {
239 Some(trimmed.to_string())
240 }
241 });
242
243 let _guard = state.persistence.acquire_lock(session_id).await;
244 let mut session = load_latest(state, session_id).await?;
245 ensure_if_match(&session, if_match)?;
246 let current = session
247 .metadata
248 .get(GOLD_CONFIG_METADATA_KEY)
249 .map(|value| value.trim().to_string())
250 .filter(|value| !value.is_empty());
251 if current == normalized {
252 return Ok(None);
253 }
254
255 if let Some(value) = normalized.as_ref() {
256 session
257 .metadata
258 .insert(GOLD_CONFIG_METADATA_KEY.to_string(), value.clone());
259 } else {
260 session.metadata.remove(GOLD_CONFIG_METADATA_KEY);
261 }
262 session.metadata_version = session.metadata_version.saturating_add(1);
263 session.updated_at = Utc::now();
264
265 state
266 .persistence
267 .storage()
268 .save_session(&session)
269 .await
270 .map_err(|e| MetadataError::Storage(format!("save_session: {e}")))?;
271 refresh_in_memory_cache(state, session_id, session).await;
272
273 Ok(Some(normalized))
274 }
275}
276
277async fn load_latest(state: &AppState, session_id: &str) -> Result<Session, MetadataError> {
280 state
281 .persistence
282 .storage()
283 .load_session(session_id)
284 .await
285 .map_err(|e| MetadataError::Storage(format!("load_session: {e}")))?
286 .ok_or_else(|| MetadataError::NotFound(session_id.to_string()))
287}
288
289async fn refresh_in_memory_cache(state: &AppState, session_id: &str, session: Session) {
291 let mut cache = state.sessions.write().await;
292 cache.insert(session_id.to_string(), session);
293}
294
295#[cfg(test)]
296mod tests {
297 use super::*;
298 use bamboo_agent_core::Session;
299
300 async fn make_state() -> AppState {
301 let temp_dir = tempfile::tempdir().unwrap();
302 AppState::new(temp_dir.path().to_path_buf())
303 .await
304 .expect("app state init")
305 }
306
307 async fn seed_session(state: &AppState, session_id: &str, title: &str) -> Session {
308 let mut session = Session::new(session_id.to_string(), "test-model".to_string());
309 session.title = title.to_string();
310 state
311 .storage
312 .save_session(&session)
313 .await
314 .expect("seed save");
315 session
316 }
317
318 #[tokio::test]
319 async fn set_title_bumps_version_and_emits_event() {
320 let state = make_state().await;
321 seed_session(&state, "s1", "New Session").await;
322
323 let sender = state.get_session_event_sender("s1").await;
324 let mut subscriber = sender.subscribe();
325
326 let result = SessionMetadataService::set_title(&state, "s1", " Hello ", None)
327 .await
328 .expect("set_title ok");
329 let (applied_title, version) = result.expect("change applied");
330 assert_eq!(applied_title, "Hello");
331 assert_eq!(version, 1);
332
333 let persisted = state.storage.load_session("s1").await.unwrap().unwrap();
334 assert_eq!(persisted.title, "Hello");
335 assert_eq!(persisted.title_version, 1);
336 assert_eq!(persisted.metadata_version, 1); let event = tokio::time::timeout(std::time::Duration::from_millis(100), subscriber.recv())
339 .await
340 .expect("event before timeout")
341 .expect("event received");
342 match event {
343 AgentEvent::SessionTitleUpdated {
344 session_id,
345 title,
346 title_version,
347 source,
348 ..
349 } => {
350 assert_eq!(session_id, "s1");
351 assert_eq!(title, "Hello");
352 assert_eq!(title_version, 1);
353 assert_eq!(source, TitleSource::Manual);
354 }
355 other => panic!("unexpected event: {other:?}"),
356 }
357 }
358
359 #[tokio::test]
360 async fn set_title_short_circuits_when_unchanged() {
361 let state = make_state().await;
362 seed_session(&state, "s1", "Same").await;
363
364 let sender = state.get_session_event_sender("s1").await;
365 let mut subscriber = sender.subscribe();
366
367 let result = SessionMetadataService::set_title(&state, "s1", "Same", None)
368 .await
369 .expect("ok");
370 assert!(result.is_none());
371
372 let persisted = state.storage.load_session("s1").await.unwrap().unwrap();
373 assert_eq!(persisted.title_version, 0);
374 assert_eq!(persisted.metadata_version, 0); let event_or_timeout =
377 tokio::time::timeout(std::time::Duration::from_millis(50), subscriber.recv()).await;
378 assert!(event_or_timeout.is_err(), "no event should be broadcast");
379 }
380
381 #[tokio::test]
382 async fn apply_generated_title_aborts_on_concurrent_rename() {
383 let state = make_state().await;
387 seed_session(&state, "s1", "User Picked This").await;
388
389 let result = SessionMetadataService::apply_generated_title(
390 &state,
391 "s1",
392 "Auto Title",
393 TitleSource::Auto,
394 false,
395 )
396 .await
397 .expect("ok");
398 assert!(
399 result.is_none(),
400 "should abort because title is no longer untitled"
401 );
402
403 let persisted = state.storage.load_session("s1").await.unwrap().unwrap();
404 assert_eq!(persisted.title, "User Picked This");
405 }
406
407 #[tokio::test]
408 async fn apply_generated_title_force_overrides_existing() {
409 let state = make_state().await;
410 seed_session(&state, "s1", "User Picked This").await;
411
412 let result = SessionMetadataService::apply_generated_title(
413 &state,
414 "s1",
415 "Forced Auto",
416 TitleSource::Auto,
417 true,
418 )
419 .await
420 .expect("ok");
421 let (applied, version) = result.expect("force applied");
422 assert_eq!(applied, "Forced Auto");
423 assert_eq!(version, 1);
424
425 let persisted = state.storage.load_session("s1").await.unwrap().unwrap();
426 assert_eq!(persisted.title, "Forced Auto");
427 assert_eq!(persisted.metadata_version, 1);
428 }
429
430 #[tokio::test]
431 async fn apply_generated_title_accepts_prompt_scoped_default_placeholder() {
432 let state = make_state().await;
433 seed_session(&state, "s1", "New session with Bodhi").await;
434
435 let result = SessionMetadataService::apply_generated_title(
436 &state,
437 "s1",
438 "Real Generated Title",
439 TitleSource::Auto,
440 false,
441 )
442 .await
443 .expect("ok");
444
445 let (applied, version) = result.expect("applied");
446 assert_eq!(applied, "Real Generated Title");
447 assert_eq!(version, 1);
448
449 let persisted = state.storage.load_session("s1").await.unwrap().unwrap();
450 assert_eq!(persisted.title, "Real Generated Title");
451 assert_eq!(persisted.title_version, 1);
452 }
453
454 #[tokio::test]
455 async fn apply_generated_title_uses_correct_source_label() {
456 let state = make_state().await;
457 seed_session(&state, "s1", "New Session").await;
458
459 let sender = state.get_session_event_sender("s1").await;
460 let mut subscriber = sender.subscribe();
461
462 SessionMetadataService::apply_generated_title(
463 &state,
464 "s1",
465 "Heuristic Title",
466 TitleSource::Fallback,
467 false,
468 )
469 .await
470 .expect("ok")
471 .expect("applied");
472
473 let event = tokio::time::timeout(std::time::Duration::from_millis(100), subscriber.recv())
474 .await
475 .expect("event")
476 .expect("not closed");
477 match event {
478 AgentEvent::SessionTitleUpdated { source, .. } => {
479 assert_eq!(source, TitleSource::Fallback);
480 }
481 other => panic!("unexpected event: {other:?}"),
482 }
483 }
484
485 #[tokio::test]
486 async fn set_pinned_emits_event_and_updates_disk() {
487 let state = make_state().await;
488 seed_session(&state, "s1", "Title").await;
489
490 let sender = state.get_session_event_sender("s1").await;
491 let mut subscriber = sender.subscribe();
492
493 let result = SessionMetadataService::set_pinned(&state, "s1", true, None)
494 .await
495 .expect("ok");
496 assert_eq!(result, Some(true));
497
498 let persisted = state.storage.load_session("s1").await.unwrap().unwrap();
499 assert!(persisted.pinned);
500 assert_eq!(persisted.metadata_version, 1); let event = tokio::time::timeout(std::time::Duration::from_millis(100), subscriber.recv())
503 .await
504 .expect("event")
505 .expect("not closed");
506 match event {
507 AgentEvent::SessionPinnedUpdated {
508 session_id, pinned, ..
509 } => {
510 assert_eq!(session_id, "s1");
511 assert!(pinned);
512 }
513 other => panic!("unexpected event: {other:?}"),
514 }
515 }
516
517 #[tokio::test]
518 async fn set_pinned_short_circuits_when_unchanged() {
519 let state = make_state().await;
520 seed_session(&state, "s1", "Title").await;
521
522 let result = SessionMetadataService::set_pinned(&state, "s1", false, None)
523 .await
524 .expect("ok");
525 assert!(result.is_none());
526 }
527
528 #[tokio::test]
529 async fn set_title_honors_matching_if_match() {
530 let state = make_state().await;
531 seed_session(&state, "s1", "New Session").await; let result = SessionMetadataService::set_title(&state, "s1", "Renamed", Some(0))
534 .await
535 .expect("matching precondition applies");
536 assert_eq!(result.expect("applied").1, 1);
537
538 let persisted = state.storage.load_session("s1").await.unwrap().unwrap();
539 assert_eq!(persisted.metadata_version, 1);
540 }
541
542 #[tokio::test]
543 async fn set_title_rejects_stale_if_match() {
544 let state = make_state().await;
545 seed_session(&state, "s1", "New Session").await; SessionMetadataService::set_pinned(&state, "s1", true, None)
549 .await
550 .expect("ok")
551 .expect("applied");
552
553 let err = SessionMetadataService::set_title(&state, "s1", "Nope", Some(0))
554 .await
555 .expect_err("stale precondition must conflict");
556 match err {
557 MetadataError::VersionConflict { expected, current } => {
558 assert_eq!(expected, 0);
559 assert_eq!(current, 1);
560 }
561 other => panic!("unexpected error: {other:?}"),
562 }
563
564 let persisted = state.storage.load_session("s1").await.unwrap().unwrap();
566 assert_eq!(persisted.title, "New Session");
567 }
568
569 #[tokio::test]
570 async fn set_title_returns_not_found_for_unknown_session() {
571 let state = make_state().await;
572 let err = SessionMetadataService::set_title(&state, "missing", "x", None)
573 .await
574 .unwrap_err();
575 assert!(matches!(err, MetadataError::NotFound(_)));
576 }
577
578 #[tokio::test]
581 async fn concurrent_authoritative_title_writes_serialize() {
582 let state = std::sync::Arc::new(make_state().await);
583 seed_session(&state, "c1", "New Session").await;
584
585 let sender = state.get_session_event_sender("c1").await;
586 let mut subscriber = sender.subscribe();
587
588 let state_a = state.clone();
589 let state_b = state.clone();
590
591 let (a, b) = tokio::join!(
592 SessionMetadataService::set_title(&state_a, "c1", "Title A", None),
593 SessionMetadataService::set_title(&state_b, "c1", "Title B", None),
594 );
595
596 let a = a.expect("A ok").expect("A applied");
597 let b = b.expect("B ok").expect("B applied");
598
599 assert!(
600 a.1 != b.1,
601 "concurrent writes must produce distinct title_versions"
602 );
603 assert!(
604 a.1 == 1 && b.1 == 2 || a.1 == 2 && b.1 == 1,
605 "versions must be 1 and 2"
606 );
607
608 let persisted = state.storage.load_session("c1").await.unwrap().unwrap();
609 assert!(
610 persisted.title == "Title A" || persisted.title == "Title B",
611 "final title must be one of the two writes"
612 );
613 assert_eq!(persisted.title_version, 2);
614 assert_eq!(persisted.metadata_version, 2);
615
616 let event1 = tokio::time::timeout(std::time::Duration::from_millis(200), subscriber.recv())
617 .await
618 .expect("event1")
619 .expect("not closed");
620 let event2 = tokio::time::timeout(std::time::Duration::from_millis(200), subscriber.recv())
621 .await
622 .expect("event2")
623 .expect("not closed");
624
625 let versions: Vec<u64> = vec![
626 match &event1 {
627 AgentEvent::SessionTitleUpdated { title_version, .. } => *title_version,
628 _ => panic!("unexpected event: {event1:?}"),
629 },
630 match &event2 {
631 AgentEvent::SessionTitleUpdated { title_version, .. } => *title_version,
632 _ => panic!("unexpected event: {event2:?}"),
633 },
634 ];
635 assert_eq!(versions, vec![1, 2], "event order must match commit order");
636 }
637
638 #[tokio::test]
641 async fn manual_title_beats_generated_title_without_lying_event() {
642 let state = std::sync::Arc::new(make_state().await);
643 seed_session(&state, "m1", "New Session").await;
644
645 let sender = state.get_session_event_sender("m1").await;
646 let mut subscriber = sender.subscribe();
647
648 let state_gen = state.clone();
649 let state_manual = state.clone();
650
651 let manual = tokio::spawn(async move {
652 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
653 SessionMetadataService::set_title(&state_manual, "m1", "Manual Override", None).await
654 });
655
656 let gen = tokio::spawn(async move {
657 SessionMetadataService::apply_generated_title(
658 &state_gen,
659 "m1",
660 "Auto Generated",
661 TitleSource::Auto,
662 false,
663 )
664 .await
665 });
666
667 let manual_result = manual.await.expect("manual ok").expect("manual ok");
668 let _gen_result = gen.await.expect("gen ok").expect("gen ok");
669
670 let _manual_changed = manual_result.expect("manual applied");
672
673 let persisted = state.storage.load_session("m1").await.unwrap().unwrap();
677 assert!(persisted.title == "Manual Override" || persisted.title == "Auto Generated");
678
679 let mut saw_manual = false;
681 let mut events: Vec<AgentEvent> = Vec::new();
682 while let Ok(Ok(e)) =
683 tokio::time::timeout(std::time::Duration::from_millis(100), subscriber.recv()).await
684 {
685 events.push(e);
686 }
687 for e in &events {
688 if let AgentEvent::SessionTitleUpdated { source, .. } = e {
689 if *source == TitleSource::Manual {
690 saw_manual = true;
691 }
692 }
693 }
694 assert!(saw_manual, "must emit manual event");
695 assert!(!events.is_empty(), "must emit at least one event");
696 }
697
698 #[tokio::test]
701 async fn set_pinned_then_runtime_save_does_not_clobber() {
702 let state = make_state().await;
703 seed_session(&state, "p1", "Title").await;
704
705 SessionMetadataService::set_pinned(&state, "p1", true, None)
706 .await
707 .expect("ok")
708 .expect("applied");
709
710 let after_pin = state.storage.load_session("p1").await.unwrap().unwrap();
711 assert!(after_pin.pinned);
712 assert_eq!(after_pin.metadata_version, 1);
713
714 let mut runtime_copy = Session::new("p1".to_string(), "test-model");
715 runtime_copy.pinned = false;
716 runtime_copy.metadata_version = 0;
717 runtime_copy.title = "Title".to_string();
718
719 state
720 .persistence
721 .merge_save_runtime(&mut runtime_copy)
722 .await
723 .expect("runtime save ok");
724
725 let after_runtime = state.storage.load_session("p1").await.unwrap().unwrap();
726 assert!(after_runtime.pinned, "runtime save must not clobber pinned");
727 assert_eq!(
728 after_runtime.metadata_version, 1,
729 "runtime save must preserve metadata_version"
730 );
731 }
732}