use anyhow::Result;
use axum::{
Router,
extract::{
Path, Query, State,
ws::{Message as WsMessage, WebSocket, WebSocketUpgrade},
},
http::{StatusCode, header},
response::{
IntoResponse, Json, Response,
sse::{Event, Sse},
},
routing::{delete, get, post},
};
use futures::{SinkExt, StreamExt};
use rust_embed::RustEmbed;
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::collections::HashMap;
use std::convert::Infallible;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::Mutex;
use tower_http::cors::{Any, CorsLayer};
use tracing::{debug, info};
use crate::agent::{Agent, AgentConfig, StreamEvent, extract_tool_detail};
use crate::concurrency::{TurnGate, WorkspaceLock};
use crate::config::Config;
use crate::heartbeat::{HeartbeatStatus, get_last_heartbeat_event};
use crate::memory::MemoryManager;
#[derive(RustEmbed)]
#[folder = "ui/"]
struct UiAssets;
const SESSION_TIMEOUT: Duration = Duration::from_secs(30 * 60);
const MAX_SESSIONS: usize = 100;
const HTTP_AGENT_ID: &str = "http";
pub struct Server {
config: Config,
turn_gate: TurnGate,
}
struct SessionEntry {
agent: Agent,
last_accessed: Instant,
dirty: bool,
}
struct AppState {
config: Config,
sessions: Mutex<HashMap<String, SessionEntry>>,
memory: MemoryManager,
turn_gate: TurnGate,
workspace_lock: WorkspaceLock,
}
impl Server {
pub fn new(config: &Config) -> Result<Self> {
Ok(Self {
config: config.clone(),
turn_gate: TurnGate::new(),
})
}
pub fn new_with_gate(config: &Config, turn_gate: TurnGate) -> Result<Self> {
Ok(Self {
config: config.clone(),
turn_gate,
})
}
pub async fn run(&self) -> Result<()> {
let memory =
MemoryManager::new_with_full_config(&self.config.memory, Some(&self.config), "main")?;
let workspace_lock = WorkspaceLock::new()?;
let state = Arc::new(AppState {
config: self.config.clone(),
sessions: Mutex::new(HashMap::new()),
memory,
turn_gate: self.turn_gate.clone(),
workspace_lock,
});
if let Err(e) = load_persisted_sessions(&state).await {
info!("Could not load persisted sessions: {}", e);
}
let cleanup_state = state.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(60));
loop {
interval.tick().await;
cleanup_expired_sessions(&cleanup_state).await;
}
});
let save_state = state.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(300));
loop {
interval.tick().await;
save_dirty_sessions(&save_state).await;
}
});
let cors = CorsLayer::new()
.allow_origin(Any)
.allow_methods(Any)
.allow_headers(Any);
let app = Router::new()
.route("/", get(serve_ui_index))
.route("/ui/{*path}", get(serve_ui_file))
.route("/health", get(health_check))
.route("/api/sessions", post(create_session))
.route("/api/sessions", get(list_sessions))
.route("/api/sessions/{session_id}", delete(delete_session))
.route("/api/sessions/{session_id}", get(get_session_status))
.route(
"/api/sessions/{session_id}/messages",
get(get_session_messages),
)
.route("/api/sessions/{session_id}/compact", post(compact_session))
.route("/api/sessions/{session_id}/clear", post(clear_session))
.route("/api/sessions/{session_id}/model", post(set_session_model))
.route("/api/chat", post(chat))
.route("/api/chat/stream", post(chat_stream))
.route("/api/ws", get(websocket_handler))
.route("/api/memory/search", get(memory_search))
.route("/api/memory/stats", get(memory_stats))
.route("/api/memory/reindex", post(memory_reindex))
.route("/api/status", get(status))
.route("/api/config", get(get_config))
.route("/api/heartbeat/status", get(heartbeat_status))
.route("/api/saved-sessions", get(list_saved_sessions))
.route("/api/saved-sessions/{session_id}", get(get_saved_session))
.route("/api/logs/daemon", get(get_daemon_logs))
.layer(cors)
.with_state(state);
let addr: SocketAddr =
format!("{}:{}", self.config.server.bind, self.config.server.port).parse()?;
info!("Starting HTTP server on http://{}", addr);
let listener = tokio::net::TcpListener::bind(addr).await?;
axum::serve(listener, app).await?;
Ok(())
}
}
struct AppError(StatusCode, String);
impl IntoResponse for AppError {
fn into_response(self) -> Response {
(self.0, self.1).into_response()
}
}
async fn cleanup_expired_sessions(state: &Arc<AppState>) {
let mut sessions = state.sessions.lock().await;
let before_count = sessions.len();
sessions.retain(|id, entry| {
let expired = entry.last_accessed.elapsed() > SESSION_TIMEOUT;
if expired {
debug!("Expiring session: {}", id);
}
!expired
});
let removed = before_count - sessions.len();
if removed > 0 {
info!("Cleaned up {} expired sessions", removed);
}
}
async fn load_persisted_sessions(state: &Arc<AppState>) -> Result<(), anyhow::Error> {
use crate::agent::list_sessions_for_agent;
let sessions_list = list_sessions_for_agent(HTTP_AGENT_ID)?;
let mut loaded = 0;
for session_info in sessions_list.into_iter().take(MAX_SESSIONS) {
let agent_config = AgentConfig {
model: state.config.agent.default_model.clone(),
context_window: state.config.agent.context_window,
reserve_tokens: state.config.agent.reserve_tokens,
};
let mut agent = Agent::new(agent_config, &state.config, state.memory.clone()).await?;
if agent.resume_session(&session_info.id).await.is_ok() {
let mut sessions = state.sessions.lock().await;
sessions.insert(
session_info.id.clone(),
SessionEntry {
agent,
last_accessed: Instant::now(),
dirty: false,
},
);
loaded += 1;
}
}
if loaded > 0 {
info!("Loaded {} persisted HTTP sessions", loaded);
}
Ok(())
}
async fn save_dirty_sessions(state: &Arc<AppState>) {
let mut sessions = state.sessions.lock().await;
let mut saved = 0;
for (id, entry) in sessions.iter_mut() {
if entry.dirty {
if let Err(e) = entry.agent.save_session_for_agent(HTTP_AGENT_ID).await {
debug!("Failed to save session {}: {}", id, e);
} else {
entry.dirty = false;
saved += 1;
}
}
}
if saved > 0 {
info!("Saved {} HTTP sessions to disk", saved);
}
}
async fn get_or_create_session(
state: &Arc<AppState>,
session_id: Option<String>,
) -> Result<String, AppError> {
let mut sessions = state.sessions.lock().await;
if let Some(ref id) = session_id
&& sessions.contains_key(id)
{
if let Some(entry) = sessions.get_mut(id) {
entry.last_accessed = Instant::now();
}
return Ok(id.clone());
}
if sessions.len() >= MAX_SESSIONS {
if let Some(oldest_id) = sessions
.iter()
.min_by_key(|(_, e)| e.last_accessed)
.map(|(id, _)| id.clone())
{
sessions.remove(&oldest_id);
info!("Removed oldest session {} to make room", oldest_id);
}
}
let new_id = session_id.unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
let agent_config = AgentConfig {
model: state.config.agent.default_model.clone(),
context_window: state.config.agent.context_window,
reserve_tokens: state.config.agent.reserve_tokens,
};
let mut agent = Agent::new(agent_config, &state.config, state.memory.clone())
.await
.map_err(|e| AppError(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
agent
.new_session()
.await
.map_err(|e| AppError(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()))?;
sessions.insert(
new_id.clone(),
SessionEntry {
agent,
last_accessed: Instant::now(),
dirty: true, },
);
info!("Created new session: {}", new_id);
Ok(new_id)
}
async fn health_check() -> &'static str {
"OK"
}
async fn serve_ui_index() -> Response {
serve_ui_asset("index.html")
}
async fn serve_ui_file(Path(path): Path<String>) -> Response {
serve_ui_asset(&path)
}
fn serve_ui_asset(path: &str) -> Response {
match UiAssets::get(path) {
Some(content) => {
let mime = mime_guess::from_path(path).first_or_octet_stream();
(
[(header::CONTENT_TYPE, mime.as_ref())],
content.data.to_vec(),
)
.into_response()
}
None => (StatusCode::NOT_FOUND, "Not found").into_response(),
}
}
#[derive(Serialize)]
struct StatusResponse {
version: String,
model: String,
memory_chunks: usize,
active_sessions: usize,
}
async fn status(State(state): State<Arc<AppState>>) -> Json<StatusResponse> {
let sessions = state.sessions.lock().await;
Json(StatusResponse {
version: env!("CARGO_PKG_VERSION").to_string(),
model: state.config.agent.default_model.clone(),
memory_chunks: state.memory.chunk_count().unwrap_or(0),
active_sessions: sessions.len(),
})
}
#[derive(Deserialize)]
struct CreateSessionRequest {
session_id: Option<String>,
}
#[derive(Serialize)]
struct SessionResponse {
session_id: String,
model: String,
}
async fn create_session(
State(state): State<Arc<AppState>>,
Json(request): Json<CreateSessionRequest>,
) -> Response {
match get_or_create_session(&state, request.session_id).await {
Ok(session_id) => Json(SessionResponse {
session_id,
model: state.config.agent.default_model.clone(),
})
.into_response(),
Err(e) => e.into_response(),
}
}
#[derive(Serialize)]
struct SessionInfo {
session_id: String,
idle_seconds: u64,
}
#[derive(Serialize)]
struct ListSessionsResponse {
sessions: Vec<SessionInfo>,
}
async fn list_sessions(State(state): State<Arc<AppState>>) -> Json<ListSessionsResponse> {
let sessions = state.sessions.lock().await;
let session_list: Vec<SessionInfo> = sessions
.iter()
.map(|(id, entry)| SessionInfo {
session_id: id.clone(),
idle_seconds: entry.last_accessed.elapsed().as_secs(),
})
.collect();
Json(ListSessionsResponse {
sessions: session_list,
})
}
async fn delete_session(
State(state): State<Arc<AppState>>,
Path(session_id): Path<String>,
) -> Response {
let mut sessions = state.sessions.lock().await;
if sessions.remove(&session_id).is_some() {
info!("Deleted session: {}", session_id);
Json(json!({"deleted": true, "session_id": session_id})).into_response()
} else {
AppError(StatusCode::NOT_FOUND, "Session not found".to_string()).into_response()
}
}
#[derive(Serialize)]
struct SessionStatusResponse {
session_id: String,
model: String,
message_count: usize,
token_count: usize,
idle_seconds: u64,
api_input_tokens: u64,
api_output_tokens: u64,
}
async fn get_session_status(
State(state): State<Arc<AppState>>,
Path(session_id): Path<String>,
) -> Response {
let sessions = state.sessions.lock().await;
match sessions.get(&session_id) {
Some(entry) => {
let status = entry.agent.session_status();
Json(SessionStatusResponse {
session_id,
model: entry.agent.model().to_string(),
message_count: status.message_count,
token_count: status.token_count,
idle_seconds: entry.last_accessed.elapsed().as_secs(),
api_input_tokens: status.api_input_tokens,
api_output_tokens: status.api_output_tokens,
})
.into_response()
}
None => AppError(StatusCode::NOT_FOUND, "Session not found".to_string()).into_response(),
}
}
#[derive(Serialize)]
struct ActiveSessionMessage {
role: String,
content: Option<String>,
tool_calls: Option<Vec<serde_json::Value>>,
tool_call_id: Option<String>,
timestamp: u64,
}
#[derive(Serialize)]
struct SessionMessagesResponse {
session_id: String,
messages: Vec<ActiveSessionMessage>,
}
async fn get_session_messages(
State(state): State<Arc<AppState>>,
Path(session_id): Path<String>,
) -> Response {
let mut sessions = state.sessions.lock().await;
match sessions.get_mut(&session_id) {
Some(entry) => {
entry.last_accessed = Instant::now();
let messages: Vec<ActiveSessionMessage> = entry
.agent
.raw_session_messages()
.iter()
.map(|sm| {
let role = match sm.message.role {
crate::agent::Role::User => "user",
crate::agent::Role::Assistant => "assistant",
crate::agent::Role::System => "system",
crate::agent::Role::Tool => "toolResult",
};
let tool_calls = sm.message.tool_calls.as_ref().map(|tcs| {
tcs.iter()
.map(|tc| {
json!({
"id": tc.id,
"name": tc.name,
"arguments": tc.arguments
})
})
.collect()
});
ActiveSessionMessage {
role: role.to_string(),
content: if sm.message.content.is_empty() {
None
} else {
Some(sm.message.content.clone())
},
tool_calls,
tool_call_id: sm.message.tool_call_id.clone(),
timestamp: sm.timestamp,
}
})
.collect();
Json(SessionMessagesResponse {
session_id,
messages,
})
.into_response()
}
None => AppError(StatusCode::NOT_FOUND, "Session not found".to_string()).into_response(),
}
}
async fn compact_session(
State(state): State<Arc<AppState>>,
Path(session_id): Path<String>,
) -> Response {
let mut sessions = state.sessions.lock().await;
match sessions.get_mut(&session_id) {
Some(entry) => {
entry.last_accessed = Instant::now();
match entry.agent.compact_session().await {
Ok((before, after)) => Json(json!({
"session_id": session_id,
"token_count_before": before,
"token_count_after": after,
}))
.into_response(),
Err(e) => {
AppError(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response()
}
}
}
None => AppError(StatusCode::NOT_FOUND, "Session not found".to_string()).into_response(),
}
}
async fn clear_session(
State(state): State<Arc<AppState>>,
Path(session_id): Path<String>,
) -> Response {
let mut sessions = state.sessions.lock().await;
match sessions.get_mut(&session_id) {
Some(entry) => {
entry.last_accessed = Instant::now();
entry.agent.clear_session();
Json(json!({"session_id": session_id, "cleared": true})).into_response()
}
None => AppError(StatusCode::NOT_FOUND, "Session not found".to_string()).into_response(),
}
}
#[derive(Deserialize)]
struct SetModelRequest {
model: String,
}
async fn set_session_model(
State(state): State<Arc<AppState>>,
Path(session_id): Path<String>,
Json(request): Json<SetModelRequest>,
) -> Response {
let mut sessions = state.sessions.lock().await;
match sessions.get_mut(&session_id) {
Some(entry) => {
entry.last_accessed = Instant::now();
match entry.agent.set_model(&request.model) {
Ok(()) => Json(json!({
"session_id": session_id,
"model": request.model,
}))
.into_response(),
Err(e) => AppError(StatusCode::BAD_REQUEST, e.to_string()).into_response(),
}
}
None => AppError(StatusCode::NOT_FOUND, "Session not found".to_string()).into_response(),
}
}
#[derive(Deserialize)]
struct ChatRequest {
message: String,
session_id: Option<String>,
model: Option<String>,
}
#[derive(Serialize)]
struct ChatResponse {
response: String,
session_id: String,
model: String,
}
async fn chat(State(state): State<Arc<AppState>>, Json(request): Json<ChatRequest>) -> Response {
let session_id = match get_or_create_session(&state, request.session_id).await {
Ok(id) => id,
Err(e) => return e.into_response(),
};
let _gate_permit = state.turn_gate.acquire().await;
let ws_lock_path = state.workspace_lock.clone();
let ws_guard = match tokio::task::spawn_blocking(move || ws_lock_path.acquire()).await {
Ok(Ok(guard)) => guard,
Ok(Err(e)) => {
return AppError(
StatusCode::INTERNAL_SERVER_ERROR,
format!("Failed to acquire workspace lock: {}", e),
)
.into_response();
}
Err(e) => {
return AppError(
StatusCode::INTERNAL_SERVER_ERROR,
format!("Lock task error: {}", e),
)
.into_response();
}
};
let mut sessions = state.sessions.lock().await;
let entry = match sessions.get_mut(&session_id) {
Some(e) => e,
None => {
return AppError(StatusCode::NOT_FOUND, "Session not found".to_string())
.into_response();
}
};
entry.last_accessed = Instant::now();
if let Some(ref model) = request.model
&& let Err(e) = entry.agent.set_model(model)
{
return AppError(StatusCode::BAD_REQUEST, format!("Invalid model: {}", e)).into_response();
}
let result = entry.agent.chat(&request.message).await;
drop(ws_guard);
match result {
Ok(response) => {
entry.dirty = true;
Json(ChatResponse {
response,
session_id,
model: entry.agent.model().to_string(),
})
.into_response()
}
Err(e) => AppError(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
}
}
async fn chat_stream(
State(state): State<Arc<AppState>>,
Json(request): Json<ChatRequest>,
) -> Response {
let session_id = match get_or_create_session(&state, request.session_id).await {
Ok(id) => id,
Err(e) => return e.into_response(),
};
let state_clone = state.clone();
let message = request.message.clone();
let stream = async_stream::stream! {
yield Ok::<Event, Infallible>(Event::default().data(json!({"type": "session", "session_id": session_id}).to_string()));
let _gate_permit = state_clone.turn_gate.acquire().await;
let ws_lock = state_clone.workspace_lock.clone();
let _ws_guard = match tokio::task::spawn_blocking(move || ws_lock.acquire()).await {
Ok(Ok(guard)) => Some(guard),
Ok(Err(e)) => {
yield Ok(Event::default().data(json!({"error": format!("Workspace lock error: {}", e)}).to_string()));
return;
}
Err(e) => {
yield Ok(Event::default().data(json!({"error": format!("Lock task error: {}", e)}).to_string()));
return;
}
};
let mut sessions = state_clone.sessions.lock().await;
let entry = match sessions.get_mut(&session_id) {
Some(e) => e,
None => {
yield Ok(Event::default().data(json!({"error": "Session not found"}).to_string()));
return;
}
};
entry.last_accessed = Instant::now();
entry.dirty = true;
match entry.agent.chat_stream_with_tools(&message).await {
Ok(event_stream) => {
use futures::StreamExt;
let mut pinned_stream = std::pin::pin!(event_stream);
while let Some(event) = pinned_stream.next().await {
match event {
Ok(StreamEvent::Content(content)) => {
let data = json!({"type": "content", "delta": content});
yield Ok(Event::default().data(data.to_string()));
}
Ok(StreamEvent::ToolCallStart { name, id, arguments }) => {
let detail = extract_tool_detail(&name, &arguments);
let data = json!({"type": "tool_start", "name": name, "id": id, "detail": detail});
yield Ok(Event::default().data(data.to_string()));
}
Ok(StreamEvent::ToolCallEnd { name, id, output, warnings }) => {
let data = json!({
"type": "tool_end",
"name": name,
"id": id,
"output": output.chars().take(500).collect::<String>(),
"warnings": warnings
});
yield Ok(Event::default().data(data.to_string()));
}
Ok(StreamEvent::Done) => {
let data = json!({"type": "done"});
yield Ok(Event::default().data(data.to_string()));
}
Err(e) => {
yield Ok(Event::default().data(json!({"error": e.to_string()}).to_string()));
break;
}
}
}
}
Err(e) => {
yield Ok(Event::default().data(json!({"error": e.to_string()}).to_string()));
}
}
yield Ok(Event::default().data("[DONE]"));
};
Sse::new(stream).into_response()
}
#[derive(Deserialize)]
struct SearchQuery {
q: String,
limit: Option<usize>,
}
#[derive(Serialize)]
struct SearchResult {
file: String,
line_start: i32,
line_end: i32,
content: String,
score: f64,
}
#[derive(Serialize)]
struct SearchResponse {
results: Vec<SearchResult>,
query: String,
}
async fn memory_search(
State(state): State<Arc<AppState>>,
Query(query): Query<SearchQuery>,
) -> Response {
match memory_search_inner(&state.memory, &query.q, query.limit) {
Ok(response) => Json(response).into_response(),
Err(e) => AppError(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
}
}
fn memory_search_inner(
memory: &MemoryManager,
query: &str,
limit: Option<usize>,
) -> Result<SearchResponse, anyhow::Error> {
let limit = limit.unwrap_or(10);
let results = memory.search(query, limit)?;
let results: Vec<SearchResult> = results
.into_iter()
.map(|r| SearchResult {
file: r.file,
line_start: r.line_start,
line_end: r.line_end,
content: r.content,
score: r.score,
})
.collect();
Ok(SearchResponse {
results,
query: query.to_string(),
})
}
#[derive(Serialize)]
struct StatsResponse {
workspace: String,
total_files: usize,
total_chunks: usize,
index_size_kb: u64,
}
async fn memory_stats(State(state): State<Arc<AppState>>) -> Response {
match memory_stats_inner(&state.memory) {
Ok(response) => Json(response).into_response(),
Err(e) => AppError(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
}
}
fn memory_stats_inner(memory: &MemoryManager) -> Result<StatsResponse, anyhow::Error> {
let stats = memory.stats()?;
Ok(StatsResponse {
workspace: stats.workspace,
total_files: stats.total_files,
total_chunks: stats.total_chunks,
index_size_kb: stats.index_size_kb,
})
}
#[derive(Deserialize)]
struct ReindexRequest {
#[serde(default)]
force: bool,
}
#[derive(Serialize)]
struct ReindexResponse {
files_processed: usize,
files_updated: usize,
chunks_indexed: usize,
duration_ms: u128,
}
async fn memory_reindex(
State(state): State<Arc<AppState>>,
Json(request): Json<ReindexRequest>,
) -> Response {
let memory = state.memory.clone();
let force = request.force;
match tokio::task::spawn_blocking(move || memory_reindex_inner(&memory, force)).await {
Ok(Ok(response)) => Json(response).into_response(),
Ok(Err(e)) => AppError(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
Err(e) => AppError(
StatusCode::INTERNAL_SERVER_ERROR,
format!("Task error: {}", e),
)
.into_response(),
}
}
fn memory_reindex_inner(
memory: &MemoryManager,
force: bool,
) -> Result<ReindexResponse, anyhow::Error> {
let stats = memory.reindex(force)?;
Ok(ReindexResponse {
files_processed: stats.files_processed,
files_updated: stats.files_updated,
chunks_indexed: stats.chunks_indexed,
duration_ms: stats.duration.as_millis(),
})
}
#[derive(Serialize)]
struct ConfigResponse {
agent: AgentConfigInfo,
server: ServerConfigInfo,
memory: MemoryConfigInfo,
heartbeat: HeartbeatConfigInfo,
}
#[derive(Serialize)]
struct AgentConfigInfo {
default_model: String,
context_window: usize,
reserve_tokens: usize,
}
#[derive(Serialize)]
struct ServerConfigInfo {
port: u16,
bind: String,
}
#[derive(Serialize)]
struct MemoryConfigInfo {
workspace: String,
embedding_model: String,
chunk_size: usize,
chunk_overlap: usize,
}
#[derive(Serialize)]
struct HeartbeatConfigInfo {
enabled: bool,
interval: String,
}
async fn get_config(State(state): State<Arc<AppState>>) -> Json<ConfigResponse> {
Json(ConfigResponse {
agent: AgentConfigInfo {
default_model: state.config.agent.default_model.clone(),
context_window: state.config.agent.context_window,
reserve_tokens: state.config.agent.reserve_tokens,
},
server: ServerConfigInfo {
port: state.config.server.port,
bind: state.config.server.bind.clone(),
},
memory: MemoryConfigInfo {
workspace: state.config.memory.workspace.clone(),
embedding_model: state.config.memory.embedding_model.clone(),
chunk_size: state.config.memory.chunk_size,
chunk_overlap: state.config.memory.chunk_overlap,
},
heartbeat: HeartbeatConfigInfo {
enabled: state.config.heartbeat.enabled,
interval: state.config.heartbeat.interval.clone(),
},
})
}
#[derive(Serialize)]
struct HeartbeatStatusResponse {
enabled: bool,
interval: String,
last_event: Option<HeartbeatEventInfo>,
}
#[derive(Serialize)]
struct HeartbeatEventInfo {
ts: u64,
status: String,
duration_ms: u64,
preview: Option<String>,
reason: Option<String>,
age_seconds: u64,
}
async fn heartbeat_status(State(state): State<Arc<AppState>>) -> Json<HeartbeatStatusResponse> {
let last_event = get_last_heartbeat_event().map(|event| {
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
let age_seconds = (now_ms.saturating_sub(event.ts)) / 1000;
let status = match event.status {
HeartbeatStatus::Sent => "sent",
HeartbeatStatus::Ok => "ok",
HeartbeatStatus::Skipped => "skipped",
HeartbeatStatus::Failed => "failed",
};
HeartbeatEventInfo {
ts: event.ts,
status: status.to_string(),
duration_ms: event.duration_ms,
preview: event.preview,
reason: event.reason,
age_seconds,
}
});
Json(HeartbeatStatusResponse {
enabled: state.config.heartbeat.enabled,
interval: state.config.heartbeat.interval.clone(),
last_event,
})
}
#[derive(Serialize)]
struct SavedSessionInfo {
id: String,
message_count: usize,
created_at: String,
}
#[derive(Serialize)]
struct SavedSessionsResponse {
sessions: Vec<SavedSessionInfo>,
}
async fn list_saved_sessions(State(_state): State<Arc<AppState>>) -> Response {
use crate::agent::list_sessions_for_agent;
match list_sessions_for_agent(HTTP_AGENT_ID) {
Ok(sessions) => {
let session_list: Vec<SavedSessionInfo> = sessions
.into_iter()
.map(|s| SavedSessionInfo {
id: s.id,
message_count: s.message_count,
created_at: s.created_at.format("%Y-%m-%dT%H:%M:%S").to_string(),
})
.collect();
Json(SavedSessionsResponse {
sessions: session_list,
})
.into_response()
}
Err(e) => AppError(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response(),
}
}
#[derive(Serialize)]
struct SavedSessionMessage {
role: String,
content: Option<String>,
tool_calls: Option<Vec<serde_json::Value>>,
tool_call_id: Option<String>,
timestamp: Option<u64>,
}
#[derive(Serialize)]
struct SavedSessionDetail {
session_id: String,
created_at: String,
messages: Vec<SavedSessionMessage>,
}
async fn get_saved_session(Path(session_id): Path<String>) -> Response {
use crate::agent::get_sessions_dir_for_agent;
use std::fs::File;
use std::io::{BufRead, BufReader};
let sessions_dir = match get_sessions_dir_for_agent(HTTP_AGENT_ID) {
Ok(dir) => dir,
Err(e) => {
return AppError(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response();
}
};
let session_path = sessions_dir.join(format!("{}.jsonl", session_id));
if !session_path.exists() {
return AppError(StatusCode::NOT_FOUND, "Session not found".to_string()).into_response();
}
let file = match File::open(&session_path) {
Ok(f) => f,
Err(e) => {
return AppError(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response();
}
};
let reader = BufReader::new(file);
let mut messages = Vec::new();
let mut created_at = String::new();
for (i, line) in reader.lines().enumerate() {
let line = match line {
Ok(l) => l,
Err(_) => continue,
};
let parsed: serde_json::Value = match serde_json::from_str(&line) {
Ok(v) => v,
Err(_) => continue,
};
if i == 0 && parsed["type"].as_str() == Some("session") {
created_at = parsed["timestamp"].as_str().unwrap_or("").to_string();
continue;
}
if parsed["type"].as_str() == Some("message")
&& let Some(msg) = parsed.get("message")
{
let role = msg["role"].as_str().unwrap_or("unknown").to_string();
let content = if let Some(content_arr) = msg["content"].as_array() {
content_arr
.iter()
.filter_map(|c| {
if c["type"].as_str() == Some("text") {
c["text"].as_str().map(String::from)
} else {
None
}
})
.collect::<Vec<_>>()
.join("\n")
} else if let Some(text) = msg["content"].as_str() {
text.to_string()
} else {
String::new()
};
let tool_calls = msg["toolCalls"].as_array().cloned();
let tool_call_id = msg["toolCallId"].as_str().map(String::from);
let timestamp = msg["timestamp"].as_u64();
messages.push(SavedSessionMessage {
role,
content: if content.is_empty() {
None
} else {
Some(content)
},
tool_calls,
tool_call_id,
timestamp,
});
}
}
Json(SavedSessionDetail {
session_id,
created_at,
messages,
})
.into_response()
}
#[derive(Deserialize)]
struct LogsQuery {
lines: Option<usize>,
}
#[derive(Serialize)]
struct DaemonLogsResponse {
lines: Vec<String>,
total_lines: usize,
file_size_bytes: u64,
}
async fn get_daemon_logs(Query(query): Query<LogsQuery>) -> Response {
use crate::agent::get_state_dir;
use std::fs::File;
use std::io::{BufRead, BufReader};
let lines_requested = query.lines.unwrap_or(200).min(1000);
let state_dir = match get_state_dir() {
Ok(dir) => dir,
Err(e) => {
return AppError(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response();
}
};
let date = chrono::Local::now().format("%Y-%m-%d");
let log_path = state_dir
.join("logs")
.join(format!("localgpt-{}.log", date));
if !log_path.exists() {
return Json(DaemonLogsResponse {
lines: vec![],
total_lines: 0,
file_size_bytes: 0,
})
.into_response();
}
let metadata = match std::fs::metadata(&log_path) {
Ok(m) => m,
Err(e) => {
return AppError(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response();
}
};
let file = match File::open(&log_path) {
Ok(f) => f,
Err(e) => {
return AppError(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response();
}
};
let reader = BufReader::new(file);
let all_lines: Vec<String> = reader.lines().map_while(Result::ok).collect();
let total_lines = all_lines.len();
let lines: Vec<String> = if total_lines > lines_requested {
all_lines[(total_lines - lines_requested)..].to_vec()
} else {
all_lines
};
Json(DaemonLogsResponse {
lines,
total_lines,
file_size_bytes: metadata.len(),
})
.into_response()
}
async fn websocket_handler(
ws: WebSocketUpgrade,
State(state): State<Arc<AppState>>,
) -> impl IntoResponse {
ws.on_upgrade(|socket| handle_websocket(socket, state))
}
#[derive(Deserialize)]
#[serde(tag = "type")]
enum WsIncoming {
#[serde(rename = "session")]
Session { session_id: Option<String> },
#[serde(rename = "chat")]
Chat { message: String },
#[serde(rename = "ping")]
Ping,
}
#[derive(Serialize)]
#[serde(tag = "type")]
#[allow(dead_code)] enum WsOutgoing {
#[serde(rename = "connected")]
Connected { session_id: String },
#[serde(rename = "content")]
Content { delta: String },
#[serde(rename = "tool_start")]
ToolStart { name: String, id: String },
#[serde(rename = "tool_end")]
ToolEnd {
name: String,
id: String,
output: String,
},
#[serde(rename = "done")]
Done,
#[serde(rename = "pong")]
Pong,
#[serde(rename = "error")]
Error { message: String },
}
async fn handle_websocket(socket: WebSocket, state: Arc<AppState>) {
let (mut sender, mut receiver) = socket.split();
debug!("WebSocket client connected");
let mut current_session_id: Option<String> = None;
while let Some(msg) = receiver.next().await {
match msg {
Ok(WsMessage::Text(text)) => {
match serde_json::from_str::<WsIncoming>(&text) {
Ok(WsIncoming::Session { session_id }) => {
match get_or_create_session(&state, session_id).await {
Ok(id) => {
current_session_id = Some(id.clone());
let connected = WsOutgoing::Connected { session_id: id };
if let Ok(json) = serde_json::to_string(&connected) {
let _ = sender.send(WsMessage::Text(json.into())).await;
}
}
Err(e) => {
let error = WsOutgoing::Error {
message: format!("Failed to create session: {}", e.1),
};
if let Ok(json) = serde_json::to_string(&error) {
let _ = sender.send(WsMessage::Text(json.into())).await;
}
}
}
}
Ok(WsIncoming::Chat { message }) => {
let session_id = match ¤t_session_id {
Some(id) => id.clone(),
None => {
match get_or_create_session(&state, None).await {
Ok(id) => {
current_session_id = Some(id.clone());
let connected = WsOutgoing::Connected {
session_id: id.clone(),
};
if let Ok(json) = serde_json::to_string(&connected) {
let _ = sender.send(WsMessage::Text(json.into())).await;
}
id
}
Err(e) => {
let error = WsOutgoing::Error {
message: format!("Failed to create session: {}", e.1),
};
if let Ok(json) = serde_json::to_string(&error) {
let _ = sender.send(WsMessage::Text(json.into())).await;
}
continue;
}
}
}
};
debug!("WebSocket chat [{}]: {}", session_id, message);
let _gate_permit = state.turn_gate.acquire().await;
let ws_lock = state.workspace_lock.clone();
let _ws_guard =
match tokio::task::spawn_blocking(move || ws_lock.acquire()).await {
Ok(Ok(guard)) => guard,
Ok(Err(e)) => {
let error = WsOutgoing::Error {
message: format!("Workspace lock error: {}", e),
};
if let Ok(json) = serde_json::to_string(&error) {
let _ = sender.send(WsMessage::Text(json.into())).await;
}
continue;
}
Err(e) => {
let error = WsOutgoing::Error {
message: format!("Lock task error: {}", e),
};
if let Ok(json) = serde_json::to_string(&error) {
let _ = sender.send(WsMessage::Text(json.into())).await;
}
continue;
}
};
let mut sessions = state.sessions.lock().await;
let entry = match sessions.get_mut(&session_id) {
Some(e) => e,
None => {
let error = WsOutgoing::Error {
message: "Session not found".to_string(),
};
if let Ok(json) = serde_json::to_string(&error) {
let _ = sender.send(WsMessage::Text(json.into())).await;
}
current_session_id = None;
continue;
}
};
entry.last_accessed = Instant::now();
match entry.agent.chat(&message).await {
Ok(response) => {
let content = WsOutgoing::Content { delta: response };
if let Ok(json) = serde_json::to_string(&content) {
let _ = sender.send(WsMessage::Text(json.into())).await;
}
let done = WsOutgoing::Done;
if let Ok(json) = serde_json::to_string(&done) {
let _ = sender.send(WsMessage::Text(json.into())).await;
}
}
Err(e) => {
let error = WsOutgoing::Error {
message: e.to_string(),
};
if let Ok(json) = serde_json::to_string(&error) {
let _ = sender.send(WsMessage::Text(json.into())).await;
}
}
}
}
Ok(WsIncoming::Ping) => {
let pong = WsOutgoing::Pong;
if let Ok(json) = serde_json::to_string(&pong) {
let _ = sender.send(WsMessage::Text(json.into())).await;
}
}
Err(e) => {
let error = WsOutgoing::Error {
message: format!("Invalid message format: {}", e),
};
if let Ok(json) = serde_json::to_string(&error) {
let _ = sender.send(WsMessage::Text(json.into())).await;
}
}
}
}
Ok(WsMessage::Ping(data)) => {
let _ = sender.send(WsMessage::Pong(data)).await;
}
Ok(WsMessage::Close(_)) => {
debug!("WebSocket client disconnected");
break;
}
Err(e) => {
debug!("WebSocket error: {}", e);
break;
}
_ => {}
}
}
debug!("WebSocket connection closed");
}