use crate::http::AppState;
use axum::{
extract::{Path, Query, State},
http::StatusCode,
response::{
sse::{Event, KeepAlive, Sse},
IntoResponse,
},
routing::{delete, get, post},
Json, Router,
};
use futures_util::stream;
use serde::{Deserialize, Serialize};
use std::convert::Infallible;
use std::sync::Arc;
#[derive(Debug, Deserialize)]
pub struct CreateSessionRequest {
pub title: Option<String>,
}
#[derive(Debug, Deserialize)]
pub struct ListSessionsQuery {
#[serde(default = "default_limit")]
pub limit: u32,
}
fn default_limit() -> u32 {
50
}
#[derive(Debug, Serialize)]
pub struct SessionResponse {
pub id: String,
pub title: Option<String>,
pub created_at: String,
pub updated_at: String,
}
#[derive(Debug, Serialize)]
pub struct SessionDetailResponse {
pub id: String,
pub title: Option<String>,
pub created_at: String,
pub updated_at: String,
pub messages: Vec<MessageResponse>,
}
#[derive(Debug, Serialize)]
pub struct MessageResponse {
pub id: String,
pub role: String,
pub content: Option<String>,
pub created_at: String,
}
#[derive(Debug, Serialize)]
struct ErrorResponse {
error: String,
}
pub fn session_router() -> Router<Arc<AppState>> {
Router::new()
.route("/sessions", post(create_session))
.route("/sessions", get(list_sessions))
.route("/sessions/:id", get(get_session))
.route("/sessions/:id", delete(delete_session))
.route("/sessions/:id/messages", post(post_message))
}
pub async fn create_session(
State(state): State<Arc<AppState>>,
body: Option<Json<CreateSessionRequest>>,
) -> impl IntoResponse {
let title = body.and_then(|b| b.title.clone());
let store = state.store.lock().await;
match store.create_session(title.as_deref()) {
Ok(session) => (
StatusCode::CREATED,
Json(serde_json::json!({ "session_id": session.id })),
)
.into_response(),
Err(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
error: e.to_string(),
}),
)
.into_response(),
}
}
pub async fn list_sessions(
State(state): State<Arc<AppState>>,
Query(params): Query<ListSessionsQuery>,
) -> impl IntoResponse {
let limit = params.limit.min(200);
let store = state.store.lock().await;
match store.list_sessions(limit) {
Ok(sessions) => {
let resp: Vec<SessionResponse> = sessions
.into_iter()
.map(|s| SessionResponse {
id: s.id,
title: s.title,
created_at: s.created_at.to_rfc3339(),
updated_at: s.updated_at.to_rfc3339(),
})
.collect();
(StatusCode::OK, Json(resp)).into_response()
}
Err(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
error: e.to_string(),
}),
)
.into_response(),
}
}
pub async fn get_session(
State(state): State<Arc<AppState>>,
Path(id): Path<String>,
) -> impl IntoResponse {
let store = state.store.lock().await;
let session = match store.get_session(&id) {
Ok(Some(s)) => s,
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ErrorResponse {
error: format!("Session '{id}' not found"),
}),
)
.into_response()
}
Err(e) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
error: e.to_string(),
}),
)
.into_response()
}
};
let messages = match store.get_messages(&id) {
Ok(msgs) => msgs,
Err(e) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
error: e.to_string(),
}),
)
.into_response()
}
};
let detail = SessionDetailResponse {
id: session.id,
title: session.title,
created_at: session.created_at.to_rfc3339(),
updated_at: session.updated_at.to_rfc3339(),
messages: messages
.into_iter()
.map(|m| MessageResponse {
id: m.id,
role: m.role,
content: m.content,
created_at: m.created_at.to_rfc3339(),
})
.collect(),
};
(StatusCode::OK, Json(detail)).into_response()
}
pub async fn delete_session(
State(state): State<Arc<AppState>>,
Path(id): Path<String>,
) -> impl IntoResponse {
let store = state.store.lock().await;
match store.get_session(&id) {
Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ErrorResponse {
error: format!("Session '{id}' not found"),
}),
)
.into_response()
}
Err(e) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
error: e.to_string(),
}),
)
.into_response()
}
Ok(Some(_)) => {} }
match store.delete_session(&id) {
Ok(()) => StatusCode::NO_CONTENT.into_response(),
Err(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
error: e.to_string(),
}),
)
.into_response(),
}
}
#[derive(Debug, Deserialize)]
pub struct PostMessageRequest {
pub content: String,
#[serde(default)]
#[allow(dead_code)]
pub images: Vec<String>,
}
pub async fn post_message(
State(state): State<Arc<AppState>>,
Path(session_id): Path<String>,
Json(body): Json<PostMessageRequest>,
) -> impl IntoResponse {
use crate::agent::director::Director;
use crate::agent::Agent;
use crate::context::AgentContext;
use crate::io::http::HttpIO;
use crate::llm;
{
let store = state.store.lock().await;
match store.get_session(&session_id) {
Ok(Some(_)) => {} Ok(None) => {
return (
StatusCode::NOT_FOUND,
Json(ErrorResponse {
error: format!("Session '{}' not found", session_id),
}),
)
.into_response();
}
Err(e) => {
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
error: e.to_string(),
}),
)
.into_response();
}
}
}
{
let mut active = state.active_sessions.lock().await;
if active.contains(&session_id) {
return (
StatusCode::CONFLICT,
Json(ErrorResponse {
error: format!(
"Session '{}' already has an active agent execution",
session_id
),
}),
)
.into_response();
}
active.insert(session_id.clone());
}
let content = body.content.clone();
{
let store = state.store.lock().await;
if let Err(e) = store.add_message(&session_id, &llm::Message::user(&content)) {
state.active_sessions.lock().await.remove(&session_id);
return (
StatusCode::INTERNAL_SERVER_ERROR,
Json(ErrorResponse {
error: e.to_string(),
}),
)
.into_response();
}
}
let (http_io, rx) = HttpIO::new();
let io: std::sync::Arc<dyn crate::io::AgentIO> = std::sync::Arc::new(http_io);
let config = state.config.clone();
let sid = session_id.clone();
let state2 = std::sync::Arc::clone(&state);
tokio::spawn(async move {
let ctx_result = AgentContext::new(
config.project_dir.clone(), false, Some(config.model.clone()),
Some(config.provider.api_base.clone()),
Some(config.provider.api_key.clone()),
config.agent.compact_mode,
std::sync::Arc::clone(&io),
)
.await;
let ctx = match ctx_result {
Ok(c) => c,
Err(e) => {
let _ = io.write_error(&format!("Agent init failed: {:#}", e)).await;
state2.active_sessions.lock().await.remove(&sid);
return;
}
};
let agents_md = crate::agent::agents_md::load_agents_md(&ctx.project_dir);
let coder = crate::agent::coder::CoderAgent::new_with_agents_md(
ctx.config.agent.clone(),
agents_md,
);
let mut messages = vec![
llm::Message::system(coder.system_prompt().as_str()),
llm::Message::user(content.as_str()),
];
let director = Director::new(ctx.config.agent.clone());
let result = director
.execute(
&mut messages,
ctx.registry.as_ref(),
ctx.llm.as_ref(),
&ctx.tool_ctx,
)
.await;
match result {
Ok(agent_result) => {
let store = state2.store.lock().await;
let _ = store.add_message(
&sid,
&llm::Message::assistant(Some(agent_result.final_message), None),
);
let _ = store.update_session_timestamp(&sid);
}
Err(e) => {
let _ = io.write_error(&format!("Agent error: {:#}", e)).await;
}
}
let _ = io.show_status("[DONE]").await;
drop(io);
state2.active_sessions.lock().await.remove(&sid);
});
let sse_stream = stream::unfold(rx, |mut receiver| async move {
let event = receiver.recv().await?;
let axum_event = Event::default()
.event(event.event_name())
.data(event.data_json());
Some((Ok::<Event, Infallible>(axum_event), receiver))
});
Sse::new(sse_stream)
.keep_alive(KeepAlive::default())
.into_response()
}