use bamboo_agent_core::{AgentEvent, Session, TitleSource};
use chrono::Utc;
use crate::app_context::AgentSessionContext;
use crate::events::publish_replayable_session_event;
use crate::model_config_helper::GOLD_CONFIG_METADATA_KEY;
use crate::title_gen::is_untitled;
#[derive(Debug, thiserror::Error)]
pub enum MetadataError {
#[error("session not found: {0}")]
NotFound(String),
#[error("storage error: {0}")]
Storage(String),
#[error("version conflict: expected {expected}, current {current}")]
VersionConflict { expected: u64, current: u64 },
}
fn ensure_if_match(session: &Session, if_match: Option<u64>) -> Result<(), MetadataError> {
if let Some(expected) = if_match {
if session.metadata_version != expected {
return Err(MetadataError::VersionConflict {
expected,
current: session.metadata_version,
});
}
}
Ok(())
}
pub type MetadataChange<T> = Option<T>;
pub struct SessionMetadataService;
impl SessionMetadataService {
pub async fn set_title(
state: &dyn AgentSessionContext,
session_id: &str,
new_title: &str,
if_match: Option<u64>,
) -> Result<MetadataChange<(String, u64)>, MetadataError> {
let trimmed = new_title.trim();
if trimmed.is_empty() {
return Err(MetadataError::Storage("title cannot be empty".into()));
}
let _guard = state.persistence().acquire_lock(session_id).await;
let mut session = load_latest(state, session_id).await?;
ensure_if_match(&session, if_match)?;
if session.title == trimmed {
return Ok(None);
}
session.title = trimmed.to_string();
session.title_version = session.title_version.saturating_add(1);
session.metadata_version = session.metadata_version.saturating_add(1);
session.updated_at = Utc::now();
state
.persistence()
.storage()
.save_session(&session)
.await
.map_err(|e| MetadataError::Storage(format!("save_session: {e}")))?;
refresh_in_memory_cache(state, session_id, session.clone()).await;
let event = AgentEvent::SessionTitleUpdated {
session_id: session.id.clone(),
title: session.title.clone(),
title_version: session.title_version,
source: TitleSource::Manual,
updated_at: session.updated_at,
};
publish_replayable_session_event(state, session_id, event).await;
Ok(Some((session.title, session.title_version)))
}
pub async fn apply_generated_title(
state: &dyn AgentSessionContext,
session_id: &str,
candidate: &str,
source: TitleSource,
force: bool,
) -> Result<MetadataChange<(String, u64)>, MetadataError> {
let trimmed = candidate.trim();
if trimmed.is_empty() {
return Ok(None);
}
let _guard = state.persistence().acquire_lock(session_id).await;
let mut session = load_latest(state, session_id).await?;
if !force && !is_untitled(&session.title) {
return Ok(None);
}
if session.title == trimmed {
return Ok(None);
}
session.title = trimmed.to_string();
session.title_version = session.title_version.saturating_add(1);
session.metadata_version = session.metadata_version.saturating_add(1);
session.updated_at = Utc::now();
state
.persistence()
.storage()
.save_session(&session)
.await
.map_err(|e| MetadataError::Storage(format!("save_session: {e}")))?;
refresh_in_memory_cache(state, session_id, session.clone()).await;
let event = AgentEvent::SessionTitleUpdated {
session_id: session.id.clone(),
title: session.title.clone(),
title_version: session.title_version,
source,
updated_at: session.updated_at,
};
publish_replayable_session_event(state, session_id, event).await;
Ok(Some((session.title, session.title_version)))
}
pub async fn set_pinned(
state: &dyn AgentSessionContext,
session_id: &str,
pinned: bool,
if_match: Option<u64>,
) -> Result<MetadataChange<bool>, MetadataError> {
let _guard = state.persistence().acquire_lock(session_id).await;
let mut session = load_latest(state, session_id).await?;
ensure_if_match(&session, if_match)?;
if session.pinned == pinned {
return Ok(None);
}
session.pinned = pinned;
session.metadata_version = session.metadata_version.saturating_add(1);
session.updated_at = Utc::now();
state
.persistence()
.storage()
.save_session(&session)
.await
.map_err(|e| MetadataError::Storage(format!("save_session: {e}")))?;
refresh_in_memory_cache(state, session_id, session.clone()).await;
let event = AgentEvent::SessionPinnedUpdated {
session_id: session.id.clone(),
pinned: session.pinned,
updated_at: session.updated_at,
};
publish_replayable_session_event(state, session_id, event).await;
Ok(Some(pinned))
}
pub async fn set_gold_config_json(
state: &dyn AgentSessionContext,
session_id: &str,
gold_config_json: Option<String>,
if_match: Option<u64>,
) -> Result<MetadataChange<Option<String>>, MetadataError> {
let normalized = gold_config_json.and_then(|value| {
let trimmed = value.trim();
if trimmed.is_empty() {
None
} else {
Some(trimmed.to_string())
}
});
let _guard = state.persistence().acquire_lock(session_id).await;
let mut session = load_latest(state, session_id).await?;
ensure_if_match(&session, if_match)?;
let current = session
.metadata
.get(GOLD_CONFIG_METADATA_KEY)
.map(|value| value.trim().to_string())
.filter(|value| !value.is_empty());
if current == normalized {
return Ok(None);
}
if let Some(value) = normalized.as_ref() {
session
.metadata
.insert(GOLD_CONFIG_METADATA_KEY.to_string(), value.clone());
} else {
session.metadata.remove(GOLD_CONFIG_METADATA_KEY);
}
session.metadata_version = session.metadata_version.saturating_add(1);
session.updated_at = Utc::now();
state
.persistence()
.storage()
.save_session(&session)
.await
.map_err(|e| MetadataError::Storage(format!("save_session: {e}")))?;
refresh_in_memory_cache(state, session_id, session).await;
Ok(Some(normalized))
}
}
async fn load_latest(state: &dyn AgentSessionContext, session_id: &str) -> Result<Session, MetadataError> {
state
.persistence()
.storage()
.load_session(session_id)
.await
.map_err(|e| MetadataError::Storage(format!("load_session: {e}")))?
.ok_or_else(|| MetadataError::NotFound(session_id.to_string()))
}
async fn refresh_in_memory_cache(state: &dyn AgentSessionContext, session_id: &str, session: Session) {
let mut cache = state.sessions().write().await;
cache.insert(session_id.to_string(), session);
}