async fn wait_for_run_finished_event(
state: &AppState,
rx: &mut tokio::sync::broadcast::Receiver<EngineEvent>,
session_id: &str,
run_id: &str,
max_wait: Duration,
) -> bool {
let deadline = tokio::time::sleep(max_wait);
tokio::pin!(deadline);
loop {
tokio::select! {
_ = &mut deadline => {
return state.run_registry.get(session_id).await.is_none();
}
event = rx.recv() => {
match event {
Ok(event)
if event.event_type == "session.run.finished"
&& event_matches_run(&event, session_id, run_id) =>
{
return true;
}
Ok(_) => {}
Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {
if state.run_registry.get(session_id).await.is_none() {
return true;
}
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
return state.run_registry.get(session_id).await.is_none();
}
}
}
}
}
}
pub(super) async fn fork_session(
State(state): State<AppState>,
Extension(tenant_context): Extension<TenantContext>,
Path(id): Path<String>,
) -> Result<Json<Value>, StatusCode> {
let session = state
.storage
.get_session(&id)
.await
.ok_or(StatusCode::NOT_FOUND)?;
ensure_same_tenant(&tenant_context, &session.tenant_context)?;
let child = state
.storage
.fork_session(&id)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?
.ok_or(StatusCode::NOT_FOUND)?;
Ok(Json(json!({"ok": true, "session": child})))
}
pub(super) async fn revert_session(
State(state): State<AppState>,
Extension(tenant_context): Extension<TenantContext>,
Path(id): Path<String>,
) -> Result<Json<Value>, StatusCode> {
let session = state
.storage
.get_session(&id)
.await
.ok_or(StatusCode::NOT_FOUND)?;
ensure_same_tenant(&tenant_context, &session.tenant_context)?;
let ok = state
.storage
.revert_session(&id)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
Ok(Json(json!({"ok": ok})))
}
pub(super) async fn unrevert_session(
State(state): State<AppState>,
Extension(tenant_context): Extension<TenantContext>,
Path(id): Path<String>,
) -> Result<Json<Value>, StatusCode> {
let session = state
.storage
.get_session(&id)
.await
.ok_or(StatusCode::NOT_FOUND)?;
ensure_same_tenant(&tenant_context, &session.tenant_context)?;
let ok = state
.storage
.unrevert_session(&id)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
Ok(Json(json!({"ok": ok})))
}
pub(super) async fn share_session(
State(state): State<AppState>,
Extension(tenant_context): Extension<TenantContext>,
Path(id): Path<String>,
) -> Result<Json<Value>, StatusCode> {
let session = state
.storage
.get_session(&id)
.await
.ok_or(StatusCode::NOT_FOUND)?;
ensure_same_tenant(&tenant_context, &session.tenant_context)?;
let share_id = state
.storage
.set_shared(&id, true)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
Ok(Json(json!({"ok": share_id.is_some(), "shareID": share_id})))
}
pub(super) async fn unshare_session(
State(state): State<AppState>,
Extension(tenant_context): Extension<TenantContext>,
Path(id): Path<String>,
) -> Result<Json<Value>, StatusCode> {
let session = state
.storage
.get_session(&id)
.await
.ok_or(StatusCode::NOT_FOUND)?;
ensure_same_tenant(&tenant_context, &session.tenant_context)?;
let _ = state
.storage
.set_shared(&id, false)
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
Ok(Json(json!({"ok": true})))
}
pub(super) async fn summarize_session(
State(state): State<AppState>,
Extension(tenant_context): Extension<TenantContext>,
Path(id): Path<String>,
) -> Result<Json<Value>, StatusCode> {
let session = state
.storage
.get_session(&id)
.await
.ok_or(StatusCode::NOT_FOUND)?;
ensure_same_tenant(&tenant_context, &session.tenant_context)?;
let total_messages = session.messages.len();
let mut text_parts = Vec::new();
for message in session.messages.iter().rev().take(4) {
for part in &message.parts {
if let MessagePart::Text { text } = part {
text_parts.push(text.clone());
}
}
}
text_parts.reverse();
let excerpt = text_parts.join(" ");
let clipped = excerpt.chars().take(280).collect::<String>();
let summary = if clipped.is_empty() {
format!("Session with {total_messages} messages and no text parts.")
} else {
format!("Session with {total_messages} messages. Recent: {clipped}")
};
state
.storage
.set_summary(&id, summary.clone())
.await
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
Ok(Json(json!({"ok": true, "summary": summary})))
}
pub(super) async fn session_diff(
State(state): State<AppState>,
Extension(tenant_context): Extension<TenantContext>,
Path(id): Path<String>,
) -> Result<Json<Value>, StatusCode> {
let session = state
.storage
.get_session(&id)
.await
.ok_or(StatusCode::NOT_FOUND)?;
ensure_same_tenant(&tenant_context, &session.tenant_context)?;
let diff = state.storage.session_diff(&id).await;
Ok(Json(json!(diff.unwrap_or_else(|| json!({})))))
}
pub(super) async fn session_children(
State(state): State<AppState>,
Extension(tenant_context): Extension<TenantContext>,
Path(id): Path<String>,
) -> Result<Json<Value>, StatusCode> {
let session = state
.storage
.get_session(&id)
.await
.ok_or(StatusCode::NOT_FOUND)?;
ensure_same_tenant(&tenant_context, &session.tenant_context)?;
Ok(Json(json!(state.storage.children(&id).await)))
}
pub(super) async fn init_session() -> Json<Value> {
Json(json!({"ok": true}))
}