use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use anyhow::Result;
use axum::{
body::Body,
extract::{Query, State},
http::{header, HeaderValue, StatusCode, Uri},
response::{
sse::{Event, KeepAlive, Sse},
IntoResponse, Response,
},
routing::get,
Json, Router,
};
use futures_util::stream::Stream;
use rust_embed::RustEmbed;
use serde_json::{json, Value};
use tokio::net::TcpListener;
use tokio::sync::{broadcast, Notify};
use crate::config::{state_path, Config};
use crate::hub::HubRegistry;
use crate::ws::ws_upgrade;
pub struct AppState {
pub config: Arc<Config>,
pub hubs: HubRegistry,
pub state_changes: broadcast::Sender<()>,
pub shutdown: Arc<Notify>,
}
#[derive(RustEmbed)]
#[folder = "$OUT_DIR/ui/dist/"]
struct UiAssets;
pub(crate) async fn run_cloudflared(cfg: Config, bind: String) -> Result<()> {
let (state_changes, _) = broadcast::channel(64);
let shutdown = Arc::new(Notify::new());
let state = Arc::new(AppState {
config: Arc::new(cfg),
hubs: HubRegistry::new(),
state_changes,
shutdown: shutdown.clone(),
});
let app = build_router(state);
let listener = TcpListener::bind(&bind).await?;
eprintln!("Mezame is listening on: http://{bind}");
axum::serve(listener, app)
.with_graceful_shutdown(shutdown_signal(shutdown))
.await?;
Ok(())
}
pub fn build_router(state: Arc<AppState>) -> Router {
Router::new()
.route("/ws", get(ws_upgrade))
.route("/state", get(get_state).put(put_state))
.route("/state/events", get(state_events))
.route("/history", get(get_history))
.route("/tool-result", get(get_tool_result))
.fallback(get(serve_ui_asset))
.with_state(state)
}
async fn shutdown_signal(shutdown: Arc<Notify>) {
let ctrl_c = async {
let _ = tokio::signal::ctrl_c().await;
};
#[cfg(unix)]
let terminate = async {
use tokio::signal::unix::{signal, SignalKind};
match signal(SignalKind::terminate()) {
Ok(mut s) => {
s.recv().await;
}
Err(e) => {
eprintln!("Failed to install SIGTERM handler: {e}");
std::future::pending::<()>().await;
}
}
};
#[cfg(not(unix))]
let terminate = std::future::pending::<()>();
tokio::select! {
_ = ctrl_c => eprintln!("\nReceived SIGINT, shutting down."),
_ = terminate => eprintln!("Received SIGTERM, shutting down."),
}
shutdown.notify_waiters();
}
async fn serve_ui_asset(uri: Uri) -> Response {
let raw_path = uri.path().trim_start_matches('/');
let (asset, resolved_path) = match UiAssets::get(raw_path) {
Some(a) => (a, raw_path),
None => match UiAssets::get("index.html") {
Some(a) => (a, "index.html"),
None => {
return (StatusCode::NOT_FOUND, "UI bundle missing").into_response();
}
},
};
let is_index = resolved_path == "index.html";
let mime = mime_for(resolved_path);
let cache_control = if is_index || resolved_path == "sw.js" {
"no-cache, no-store, must-revalidate"
} else if resolved_path.starts_with("assets/") {
"public, max-age=31536000, immutable"
} else {
"public, max-age=3600"
};
Response::builder()
.status(StatusCode::OK)
.header(header::CONTENT_TYPE, HeaderValue::from_static(mime))
.header(
header::CACHE_CONTROL,
HeaderValue::from_static(cache_control),
)
.body(Body::from(asset.data.into_owned()))
.unwrap_or_else(|_| {
(StatusCode::INTERNAL_SERVER_ERROR, "response build failed").into_response()
})
}
const MIME_TABLE: &[(&str, &str)] = &[
("html", "text/html; charset=utf-8"),
("js", "application/javascript; charset=utf-8"),
("mjs", "application/javascript; charset=utf-8"),
("css", "text/css; charset=utf-8"),
("json", "application/json; charset=utf-8"),
("map", "application/json; charset=utf-8"),
("svg", "image/svg+xml"),
("png", "image/png"),
("jpg", "image/jpeg"),
("jpeg", "image/jpeg"),
("gif", "image/gif"),
("webp", "image/webp"),
("ico", "image/x-icon"),
("woff", "font/woff"),
("woff2", "font/woff2"),
("ttf", "font/ttf"),
("otf", "font/otf"),
("txt", "text/plain; charset=utf-8"),
("webmanifest", "application/manifest+json"),
];
pub fn mime_for(path: &str) -> &'static str {
let ext = path.rsplit('.').next().unwrap_or("");
MIME_TABLE
.iter()
.find(|(k, _)| ext.eq_ignore_ascii_case(k))
.map(|(_, v)| *v)
.unwrap_or("application/octet-stream")
}
async fn get_state() -> Result<Json<Value>, (StatusCode, String)> {
let path = state_path().map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("{e}")))?;
match tokio::fs::read_to_string(&path).await {
Ok(raw) => {
let v: Value = serde_json::from_str(&raw).unwrap_or_else(|_| json!({}));
Ok(Json(v))
}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(Json(json!({}))),
Err(e) => Err((StatusCode::INTERNAL_SERVER_ERROR, format!("{e}"))),
}
}
async fn put_state(
State(app): State<Arc<AppState>>,
Json(body): Json<Value>,
) -> Result<StatusCode, (StatusCode, String)> {
let path = state_path().map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("{e}")))?;
if let Some(parent) = path.parent() {
tokio::fs::create_dir_all(parent)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("{e}")))?;
}
let tmp = path.with_extension("json.tmp");
let data = serde_json::to_string_pretty(&body)
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("{e}")))?;
tokio::fs::write(&tmp, data)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("{e}")))?;
tokio::fs::rename(&tmp, &path)
.await
.map_err(|e| (StatusCode::INTERNAL_SERVER_ERROR, format!("{e}")))?;
let _ = app.state_changes.send(());
Ok(StatusCode::NO_CONTENT)
}
async fn state_events(
State(app): State<Arc<AppState>>,
) -> Sse<impl Stream<Item = Result<Event, std::convert::Infallible>>> {
let rx = app.state_changes.subscribe();
let shutdown = app.shutdown.clone();
let stream = futures_util::stream::unfold((rx, shutdown), |(mut rx, shutdown)| async move {
loop {
tokio::select! {
_ = shutdown.notified() => return None,
msg = rx.recv() => match msg {
Ok(()) => {
return Some((
Ok(Event::default().event("state_changed").data("")),
(rx, shutdown),
));
}
Err(broadcast::error::RecvError::Lagged(_)) => continue,
Err(broadcast::error::RecvError::Closed) => return None,
},
}
}
});
Sse::new(stream).keep_alive(
KeepAlive::new()
.interval(Duration::from_secs(15))
.text("keep-alive"),
)
}
async fn get_history(
Query(params): Query<HashMap<String, String>>,
) -> Result<Json<Value>, (StatusCode, String)> {
let Some(sid) = params.get("session") else {
return Err((StatusCode::BAD_REQUEST, "missing ?session=<id>".into()));
};
if sid.is_empty() || sid.contains('/') || sid.contains('\\') || sid.contains("..") {
return Err((StatusCode::BAD_REQUEST, "invalid session id".into()));
}
let Ok(home) = std::env::var("HOME") else {
return Err((StatusCode::INTERNAL_SERVER_ERROR, "HOME not set".into()));
};
let path = PathBuf::from(home).join(format!(".kiro/sessions/cli/{sid}.jsonl"));
let raw = match tokio::fs::read_to_string(&path).await {
Ok(s) => s,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
return Ok(Json(json!({ "entries": [] })));
}
Err(e) => return Err((StatusCode::INTERNAL_SERVER_ERROR, format!("{e}"))),
};
let entries = parse_kiro_history(&raw);
Ok(Json(json!({ "entries": entries })))
}
async fn get_tool_result(
Query(params): Query<HashMap<String, String>>,
) -> Result<Json<Value>, (StatusCode, String)> {
let Some(sid) = params.get("session") else {
return Err((StatusCode::BAD_REQUEST, "missing ?session=<id>".into()));
};
let Some(tool_use_id) = params.get("id") else {
return Err((StatusCode::BAD_REQUEST, "missing ?id=<toolUseId>".into()));
};
if sid.is_empty() || sid.contains('/') || sid.contains('\\') || sid.contains("..") {
return Err((StatusCode::BAD_REQUEST, "invalid session id".into()));
}
let Ok(home) = std::env::var("HOME") else {
return Err((StatusCode::INTERNAL_SERVER_ERROR, "HOME not set".into()));
};
let path = PathBuf::from(home).join(format!(".kiro/sessions/cli/{sid}.jsonl"));
let raw = match tokio::fs::read_to_string(&path).await {
Ok(s) => s,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
return Err((StatusCode::NOT_FOUND, "session not found".into()));
}
Err(e) => return Err((StatusCode::INTERNAL_SERVER_ERROR, format!("{e}"))),
};
let Some(found) = find_tool_result(&raw, tool_use_id) else {
return Err((StatusCode::NOT_FOUND, "tool result not found".into()));
};
Ok(Json(found))
}
pub fn find_tool_result(raw: &str, tool_use_id: &str) -> Option<Value> {
for line in raw.lines().rev() {
if line.trim().is_empty() {
continue;
}
let Ok(entry) = serde_json::from_str::<Value>(line) else {
continue;
};
if entry.get("kind").and_then(Value::as_str) != Some("ToolResults") {
continue;
}
let Some(content) = entry
.get("data")
.and_then(|d| d.get("content"))
.and_then(Value::as_array)
else {
continue;
};
for block in content {
if block.get("kind").and_then(Value::as_str) != Some("toolResult") {
continue;
}
let Some(inner) = block.get("data") else {
continue;
};
if inner.get("toolUseId").and_then(Value::as_str) != Some(tool_use_id) {
continue;
}
return Some(json!({
"status": inner.get("status").cloned().unwrap_or(Value::Null),
"content": inner.get("content").cloned().unwrap_or(Value::Null)
}));
}
}
None
}
pub fn parse_kiro_history(raw: &str) -> Vec<Value> {
let mut out: Vec<Value> = Vec::new();
let mut current_ts_ms: Option<i64> = None;
for line in raw.lines() {
if line.trim().is_empty() {
continue;
}
let Ok(entry) = serde_json::from_str::<Value>(line) else {
continue;
};
let kind = entry.get("kind").and_then(Value::as_str).unwrap_or("");
let data = entry.get("data").cloned().unwrap_or(Value::Null);
match kind {
"Prompt" => {
let ts_sec = data
.get("meta")
.and_then(|m| m.get("timestamp"))
.and_then(Value::as_i64);
if let Some(secs) = ts_sec {
current_ts_ms = Some(secs.saturating_mul(1000));
}
if let Some(text) = extract_text_blocks(&data) {
out.push(json!({
"role": "user",
"text": text,
"timestamp": current_ts_ms
}));
}
}
"AssistantMessage" => {
if let Some(text) = extract_thinking_blocks(&data) {
out.push(json!({
"role": "thought",
"text": text,
"timestamp": current_ts_ms
}));
}
if let Some(text) = extract_text_blocks(&data) {
out.push(json!({
"role": "agent",
"text": text,
"timestamp": current_ts_ms
}));
}
emit_tool_use_blocks(&data, current_ts_ms, &mut out);
}
"ToolResults" => {
merge_tool_results(&data, &mut out);
}
_ => {
}
}
}
out
}
pub fn extract_text_blocks(data: &Value) -> Option<String> {
let content = data.get("content")?.as_array()?;
let mut buf = String::new();
for block in content {
if block.get("kind").and_then(Value::as_str) == Some("text") {
if let Some(s) = block.get("data").and_then(Value::as_str) {
if !s.is_empty() {
if !buf.is_empty() {
buf.push('\n');
}
buf.push_str(s);
}
}
}
}
if buf.is_empty() {
None
} else {
Some(buf)
}
}
pub fn extract_thinking_blocks(data: &Value) -> Option<String> {
let content = data.get("content")?.as_array()?;
let mut buf = String::new();
for block in content {
if block.get("kind").and_then(Value::as_str) == Some("thinking") {
if let Some(s) = block
.get("data")
.and_then(|d| d.get("text"))
.and_then(Value::as_str)
{
if !s.is_empty() {
if !buf.is_empty() {
buf.push('\n');
}
buf.push_str(s);
}
}
}
}
if buf.is_empty() {
None
} else {
Some(buf)
}
}
pub fn emit_tool_use_blocks(data: &Value, ts_ms: Option<i64>, out: &mut Vec<Value>) {
let Some(content) = data.get("content").and_then(Value::as_array) else {
return;
};
for block in content {
if block.get("kind").and_then(Value::as_str) != Some("toolUse") {
continue;
}
let inner = block.get("data").cloned().unwrap_or(Value::Null);
let tool_use_id = inner.get("toolUseId").cloned().unwrap_or(Value::Null);
if tool_use_id.is_null() {
continue;
}
let title = inner
.get("name")
.and_then(Value::as_str)
.unwrap_or("tool")
.to_string();
let raw_input = inner.get("input").cloned().unwrap_or(Value::Null);
out.push(json!({
"role": "tool_call",
"toolCallId": tool_use_id,
"title": title,
"status": Value::Null,
"kind": Value::Null,
"rawInput": raw_input,
"content": Value::Null,
"locations": Value::Null,
"timestamp": ts_ms
}));
}
}
pub fn merge_tool_results(data: &Value, out: &mut [Value]) {
let Some(content) = data.get("content").and_then(Value::as_array) else {
return;
};
for block in content {
if block.get("kind").and_then(Value::as_str) != Some("toolResult") {
continue;
}
let inner = block.get("data").cloned().unwrap_or(Value::Null);
let Some(target_id) = inner.get("toolUseId") else {
continue;
};
for entry in out.iter_mut().rev() {
if entry.get("role").and_then(Value::as_str) != Some("tool_call") {
continue;
}
if entry.get("toolCallId") != Some(target_id) {
continue;
}
let map = match entry.as_object_mut() {
Some(m) => m,
None => continue,
};
if let Some(status) = inner.get("status").cloned() {
map.insert("status".into(), status);
}
if let Some(c) = inner.get("content").cloned() {
map.insert("content".into(), c);
}
break;
}
}
}