use anyhow::Context;
use axum::{
extract::ws::{Message, WebSocket, WebSocketUpgrade},
extract::State,
http::{HeaderMap, StatusCode},
response::IntoResponse,
routing::get,
Router,
};
use chrono::Local;
use futures::{SinkExt, StreamExt};
use std::sync::Arc;
use synaps_cli::engine::commands::{self as engine_commands, CommandResult};
use synaps_cli::engine::session::ConversationState;
use synaps_cli::engine::setup::{self, BackgroundTasks, EngineOpts};
use synaps_cli::engine::stream::{self, EngineStreamEvent, StreamCompletion, SubagentTracker};
use synaps_cli::protocol::{ClientMessage, HistoryEntry, ServerMessage};
use synaps_cli::{truncate_str, CancellationToken, Runtime};
use axum::extract::Query;
use rand::Rng;
use std::collections::HashMap;
use synaps_cli::core::config::load_config;
use synaps_cli::core::config::resolve_write_path;
use tokio::sync::{broadcast, Mutex, RwLock};
struct ServerState {
runtime: Mutex<Runtime>,
allowed_origins: Vec<String>,
conv: RwLock<ConversationState>,
display_history: RwLock<Vec<HistoryEntry>>,
streaming: std::sync::atomic::AtomicBool,
cancel_token: RwLock<Option<CancellationToken>>,
broadcast_tx: broadcast::Sender<ServerMessage>,
client_count: RwLock<usize>,
#[allow(dead_code)] background: BackgroundTasks,
auth_token: Option<String>,
max_message_size: Option<usize>,
auto_approve_confirms: bool,
}
struct StreamingGuard {
state: Arc<ServerState>,
}
impl Drop for StreamingGuard {
fn drop(&mut self) {
self.state
.streaming
.store(false, std::sync::atomic::Ordering::Release);
}
}
impl ServerState {
fn timestamp() -> String {
Local::now().format("%H:%M").to_string()
}
#[allow(clippy::too_many_arguments)]
async fn add_usage(
&self,
input_tokens: u64,
output_tokens: u64,
cache_read: u64,
cache_creation: u64,
cache_creation_5m: Option<u64>,
cache_creation_1h: Option<u64>,
model: &str,
) {
let mut conv = self.conv.write().await;
conv.add_usage(
input_tokens, output_tokens, cache_read, cache_creation,
cache_creation_5m, cache_creation_1h, model,
);
}
async fn save_session(&self) {
let session_to_save = {
let mut conv = self.conv.write().await;
if conv.api_messages.is_empty() {
return;
}
conv.session.api_messages = conv.api_messages.clone();
conv.session.total_input_tokens = conv.total_input_tokens;
conv.session.total_output_tokens = conv.total_output_tokens;
conv.session.session_cost = conv.session_cost;
conv.session.abort_context = conv.abort_context.clone();
conv.session.updated_at = chrono::Utc::now();
conv.session.auto_title();
conv.session.clone()
}; if let Err(e) = session_to_save.save().await {
tracing::error!("Failed to save session: {}", e);
}
}
async fn push_history(&self, entry: HistoryEntry) {
self.display_history.write().await.push(entry);
}
}
pub async fn run(
port: u16,
host: String,
system: Option<String>,
continue_session: Option<Option<String>>,
profile: Option<String>,
token_override: Option<String>,
auto_approve_flag: bool,
allowed_origins_override: Option<String>,
) -> anyhow::Result<()> {
let boot = setup::boot(EngineOpts {
continue_session,
system,
profile,
no_extensions: false,
})
.await
.context("engine boot failed")?;
let (loader_tx, mut loader_rx) = tokio::sync::mpsc::unbounded_channel();
synaps_cli::extensions::loader::spawn_discover_and_load(
Arc::clone(&boot.ext_manager),
loader_tx,
);
tokio::spawn(async move {
use synaps_cli::extensions::loader::ExtensionLoaderEvent;
while let Some(ev) = loader_rx.recv().await {
if let ExtensionLoaderEvent::Finished { loaded, failed } = ev {
tracing::info!(
server_extensions_loaded = loaded.len(),
server_extensions_failed = failed.len(),
"server: extensions ready"
);
}
}
});
let runtime = boot.runtime;
let initial_history = rebuild_history(&boot.api_messages);
let conv = if boot.continued {
ConversationState::from_resumed(boot.session)
} else {
ConversationState::new(boot.session)
};
let session_id = conv.session.id.clone();
let (broadcast_tx, _) = broadcast::channel::<ServerMessage>(256);
let config = load_config();
let allowed_origins = if let Some(ref origins) = allowed_origins_override {
origins.split(',').map(|s| s.trim().to_string()).filter(|s| !s.is_empty()).collect()
} else {
config.server.allowed_origins.clone()
};
let max_message_size = config.server.max_message_size;
let auto_approve_confirms = auto_approve_flag || config.server.auto_approve_confirms;
let auth_token: Option<String> = match &token_override {
Some(t) if t.is_empty() => None,
Some(t) => Some(t.clone()),
None => {
let token = if let Some(t) = config.server.token {
t
} else {
let bytes: [u8; 32] = rand::rng().random();
bytes.iter().map(|b| format!("{:02x}", b)).collect()
};
let token_path = resolve_write_path("server-token");
let tmp_path = token_path.with_extension("tmp");
if let Err(e) = std::fs::write(&tmp_path, &token) {
eprintln!("Warning: could not write token file: {e}");
} else {
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
let _ = std::fs::set_permissions(&tmp_path, std::fs::Permissions::from_mode(0o600));
}
let _ = std::fs::rename(&tmp_path, &token_path);
}
Some(token)
}
};
let state = Arc::new(ServerState {
runtime: Mutex::new(runtime),
conv: RwLock::new(conv),
display_history: RwLock::new(initial_history),
streaming: std::sync::atomic::AtomicBool::new(false),
cancel_token: RwLock::new(None),
broadcast_tx,
client_count: RwLock::new(0),
background: boot.background,
auth_token: auth_token.clone(),
allowed_origins,
max_message_size,
auto_approve_confirms,
});
let app = Router::new()
.route("/ws", get(ws_handler))
.route("/health", get(health_handler))
.with_state(state.clone());
let addr = format!("{}:{}", host, port);
let listener = tokio::net::TcpListener::bind(&addr).await?;
eprintln!("╔══════════════════════════════════════╗");
eprintln!("║ SynapsCLI Server v0.2 ║");
eprintln!("╠══════════════════════════════════════╣");
eprintln!("║ Listening: ws://{}:{:<5} ║", host, port);
eprintln!("║ Session: {:<24}║", &session_id);
if let Some(ref tok) = auth_token {
eprintln!("║ Token: {:<24}║", &tok[..tok.len().min(24)]);
}
eprintln!("╚══════════════════════════════════════╝");
axum::serve(listener, app)
.with_graceful_shutdown(shutdown_signal())
.await?;
const SAVE_TIMEOUT_SECS: u64 = 2;
const HOOKS_TIMEOUT_SECS: u64 = 5;
eprintln!("\n↓ graceful shutdown — saving session, firing hooks, unregistering.");
match tokio::time::timeout(
std::time::Duration::from_secs(SAVE_TIMEOUT_SECS),
state.save_session(),
)
.await
{
Ok(()) => eprintln!(" ✓ session saved"),
Err(_) => eprintln!(" ⚠ session save timed out after {}s", SAVE_TIMEOUT_SECS),
}
{
let runtime = state.runtime.lock().await;
let hook = synaps_cli::extensions::hooks::events::HookEvent::on_session_end(
&session_id,
None, );
match tokio::time::timeout(
std::time::Duration::from_secs(HOOKS_TIMEOUT_SECS),
runtime.hook_bus().emit_concurrent(&hook),
)
.await
{
Ok(_) => eprintln!(" ✓ on_session_end hooks fired"),
Err(_) => eprintln!(
" ⚠ on_session_end hooks timed out after {}s — extensions may not have flushed",
HOOKS_TIMEOUT_SECS
),
}
}
state.background.shutdown();
Ok(())
}
async fn shutdown_signal() {
let ctrl_c = async {
let _ = tokio::signal::ctrl_c().await;
};
#[cfg(unix)]
let terminate = async {
match tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) {
Ok(mut sig) => {
sig.recv().await;
}
Err(e) => {
tracing::warn!("failed to install SIGTERM handler: {e}");
std::future::pending::<()>().await;
}
}
};
#[cfg(not(unix))]
let terminate = std::future::pending::<()>();
tokio::select! {
_ = ctrl_c => tracing::info!("received SIGINT, shutting down"),
_ = terminate => tracing::info!("received SIGTERM, shutting down"),
}
}
async fn health_handler() -> impl IntoResponse {
"ok"
}
async fn ws_handler(
ws: WebSocketUpgrade,
State(state): State<Arc<ServerState>>,
Query(params): Query<HashMap<String, String>>,
headers: HeaderMap,
) -> impl IntoResponse {
if !state.allowed_origins.is_empty() {
let origin = headers
.get(axum::http::header::ORIGIN)
.and_then(|v| v.to_str().ok());
match origin {
Some(o) if state.allowed_origins.iter().any(|a| a == o) => {}
_ => {
tracing::warn!(
origin = ?headers.get(axum::http::header::ORIGIN).map(|v| v.to_str().unwrap_or("<invalid>")),
"WebSocket upgrade rejected: origin not in allowlist"
);
return (StatusCode::FORBIDDEN, "Forbidden: origin not allowed").into_response();
}
}
}
if let Some(ref expected) = state.auth_token {
let provided = params.get("token").map(|s| s.as_str()).or_else(|| {
headers
.get("authorization")
.and_then(|v| v.to_str().ok())
.and_then(|v| v.strip_prefix("Bearer "))
});
let valid = match provided {
Some(tok) => {
let a = tok.as_bytes();
let b = expected.as_bytes();
a.len() == b.len() && a.iter().zip(b.iter()).fold(0u8, |acc, (x, y)| acc | (x ^ y)) == 0
}
None => false,
};
if !valid {
tracing::warn!("WebSocket upgrade rejected: invalid or missing auth token");
return (StatusCode::UNAUTHORIZED, "Unauthorized").into_response();
}
}
ws.on_upgrade(|socket| handle_client(socket, state))
.into_response()
}
async fn handle_client(socket: WebSocket, state: Arc<ServerState>) {
let (mut ws_tx, mut ws_rx) = socket.split();
{
let mut count = state.client_count.write().await;
*count += 1;
let n = *count;
tracing::info!("Client connected ({} total)", n);
let _ = state.broadcast_tx.send(ServerMessage::System {
message: format!("client connected ({} total)", n),
});
}
let mut broadcast_rx = state.broadcast_tx.subscribe();
let tx_handle = tokio::spawn(async move {
loop {
match broadcast_rx.recv().await {
Ok(msg) => {
if let Ok(json) = serde_json::to_string(&msg) {
if ws_tx.send(Message::Text(json)).await.is_err() {
break;
}
}
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
tracing::warn!(
dropped = n,
"client lagged on broadcast channel — keeping forward pipe alive"
);
let warn = ServerMessage::System {
message: format!("[client lagged — {} message(s) dropped]", n),
};
if let Ok(json) = serde_json::to_string(&warn) {
let _ = ws_tx.send(Message::Text(json)).await;
}
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
break;
}
}
}
});
while let Some(Ok(msg)) = ws_rx.next().await {
match msg {
Message::Text(text) => {
if let Some(max) = state.max_message_size {
let len = text.len();
if len > max {
tracing::warn!(len, max, "inbound message too large — dropping");
let _ = state.broadcast_tx.send(ServerMessage::Error {
message: format!(
"Message too large: {} bytes exceeds limit of {} bytes",
len, max
),
});
continue;
}
}
if let Ok(client_msg) = serde_json::from_str::<ClientMessage>(&text) {
handle_message(client_msg, &state).await;
}
}
Message::Close(_) => break,
_ => {}
}
}
tx_handle.abort();
{
let mut count = state.client_count.write().await;
*count = count.saturating_sub(1);
let n = *count;
tracing::info!("Client disconnected ({} remaining)", n);
let _ = state.broadcast_tx.send(ServerMessage::System {
message: format!("client disconnected ({} remaining)", n),
});
}
}
async fn handle_message(msg: ClientMessage, state: &Arc<ServerState>) {
match msg {
ClientMessage::Message { content } => {
handle_user_message(content, state).await;
}
ClientMessage::Command { name, args } => {
handle_command(&name, &args, state).await;
}
ClientMessage::Cancel => {
let token = state.cancel_token.read().await;
if let Some(ref ct) = *token {
ct.cancel();
}
let _ = state.broadcast_tx.send(ServerMessage::System {
message: "canceled".to_string(),
});
}
ClientMessage::Status => {
let runtime = state.runtime.lock().await;
let model = runtime.model().to_string();
let thinking = runtime.thinking_level().to_string();
drop(runtime);
let conv = state.conv.read().await;
let _ = state.broadcast_tx.send(ServerMessage::StatusResponse {
model,
thinking,
streaming: state.streaming.load(std::sync::atomic::Ordering::Acquire),
session_id: conv.session.id.clone(),
total_input_tokens: conv.total_input_tokens,
total_output_tokens: conv.total_output_tokens,
session_cost: conv.session_cost,
connected_clients: *state.client_count.read().await,
});
}
ClientMessage::History => {
let history = state.display_history.read().await;
let _ = state.broadcast_tx.send(ServerMessage::HistoryResponse {
messages: history.clone(),
});
}
}
}
async fn handle_user_message(content: String, state: &Arc<ServerState>) {
if state
.streaming
.swap(true, std::sync::atomic::Ordering::AcqRel)
{
let _ = state.broadcast_tx.send(ServerMessage::Error {
message: "already streaming — cancel first or wait".to_string(),
});
return;
}
let _streaming_guard = StreamingGuard {
state: Arc::clone(state),
};
let ts = ServerState::timestamp();
state
.push_history(HistoryEntry::User {
content: content.clone(),
time: ts,
})
.await;
{
let mut conv = state.conv.write().await;
conv.api_messages
.push(serde_json::json!({"role": "user", "content": content}));
}
let mut subagents: Vec<SubagentTracker> = Vec::new();
let model = {
let rt = state.runtime.lock().await;
rt.model().to_string()
};
let broadcast = state.broadcast_tx.clone();
'turn: loop {
let messages = state.conv.read().await.api_messages.clone();
let cancel = CancellationToken::new();
*state.cancel_token.write().await = Some(cancel.clone());
let mut stream = {
let rt = state.runtime.lock().await;
rt.run_stream_with_messages(messages, cancel, None, None, state.auto_approve_confirms)
.await
};
while let Some(event) = stream.next().await {
let ts = ServerState::timestamp();
let (engine_event, completion) = {
let mut conv = state.conv.write().await;
let conv = &mut *conv;
stream::process_stream_event(
event,
&mut conv.api_messages,
&mut subagents,
&mut conv.queued_message,
&mut conv.pending_events,
)
};
apply_engine_event_side_effects(&engine_event, state, &model, &ts).await;
if let Some(msg) = engine_event_to_server_message(engine_event) {
let _ = broadcast.send(msg);
}
match completion {
StreamCompletion::Continue => {}
StreamCompletion::Done => {
state.save_session().await;
break 'turn;
}
StreamCompletion::Error(ref err_msg) => {
tracing::debug!(error = %err_msg, "stream completed with error");
state.save_session().await;
break 'turn;
}
StreamCompletion::AutoSendQueued(queued) => {
{
let mut conv = state.conv.write().await;
conv.api_messages
.push(serde_json::json!({"role": "user", "content": queued}));
}
state.save_session().await;
continue 'turn;
}
StreamCompletion::AutoTriggerEvents => {
state.save_session().await;
continue 'turn;
}
}
}
state.save_session().await;
break 'turn;
}
*state.cancel_token.write().await = None;
}
async fn apply_engine_event_side_effects(
event: &EngineStreamEvent,
state: &Arc<ServerState>,
model: &str,
ts: &str,
) {
match event {
EngineStreamEvent::Thinking(text) => {
let mut history = state.display_history.write().await;
if let Some(HistoryEntry::Thinking { content: c, .. }) = history.last_mut() {
c.push_str(text);
} else {
history.push(HistoryEntry::Thinking {
content: text.clone(),
time: ts.to_string(),
});
}
}
EngineStreamEvent::Text(text) => {
let mut history = state.display_history.write().await;
if let Some(HistoryEntry::Text { content: c, .. }) = history.last_mut() {
c.push_str(text);
} else {
history.push(HistoryEntry::Text {
content: text.clone(),
time: ts.to_string(),
});
}
}
EngineStreamEvent::ToolFinalized {
tool_name, input, ..
} => {
let input_str = serde_json::to_string(input).unwrap_or_default();
state
.push_history(HistoryEntry::ToolUse {
tool_name: tool_name.clone(),
input: input_str,
time: ts.to_string(),
})
.await;
}
EngineStreamEvent::ToolResult { result, .. } => {
state
.push_history(HistoryEntry::ToolResult {
result: result.clone(),
time: ts.to_string(),
})
.await;
}
EngineStreamEvent::Usage {
input_tokens,
output_tokens,
cache_read,
cache_creation,
cache_creation_5m,
cache_creation_1h,
model: _event_model,
} => {
state
.add_usage(
*input_tokens,
*output_tokens,
*cache_read,
*cache_creation,
*cache_creation_5m,
*cache_creation_1h,
model,
)
.await;
}
EngineStreamEvent::Error(err) => {
state
.push_history(HistoryEntry::Error {
content: err.clone(),
time: ts.to_string(),
})
.await;
}
EngineStreamEvent::Notice(text) => {
state
.push_history(HistoryEntry::System {
content: text.clone(),
time: ts.to_string(),
})
.await;
}
EngineStreamEvent::ToolStart { .. }
| EngineStreamEvent::ToolDelta { .. }
| EngineStreamEvent::ToolResultDelta { .. }
| EngineStreamEvent::SubagentStart { .. }
| EngineStreamEvent::SubagentUpdate { .. }
| EngineStreamEvent::SubagentDone { .. }
| EngineStreamEvent::SteeringDelivered { .. }
| EngineStreamEvent::Done
| EngineStreamEvent::Noop => {}
}
}
fn engine_event_to_server_message(event: EngineStreamEvent) -> Option<ServerMessage> {
match event {
EngineStreamEvent::Thinking(content) => Some(ServerMessage::Thinking { content }),
EngineStreamEvent::Text(content) => Some(ServerMessage::Text { content }),
EngineStreamEvent::ToolStart { tool_name, .. } => {
Some(ServerMessage::ToolUseStart { tool_name })
}
EngineStreamEvent::ToolDelta { delta, .. } => Some(ServerMessage::ToolUseDelta(delta)),
EngineStreamEvent::ToolFinalized {
tool_id,
tool_name,
input,
} => {
Some(ServerMessage::ToolUse {
tool_name,
tool_id,
input,
})
}
EngineStreamEvent::ToolResultDelta { tool_id, delta } => {
Some(ServerMessage::ToolResultDelta { tool_id, delta })
}
EngineStreamEvent::ToolResult { tool_id, result } => {
Some(ServerMessage::ToolResult { tool_id, result })
}
EngineStreamEvent::Usage {
input_tokens,
output_tokens,
cache_read: _cache_read,
cache_creation: _cache_creation,
cache_creation_5m,
cache_creation_1h,
model: _model,
} => Some(ServerMessage::Usage {
input_tokens,
output_tokens,
cache_creation_5m,
cache_creation_1h,
}),
EngineStreamEvent::Done => Some(ServerMessage::Done),
EngineStreamEvent::Error(message) => Some(ServerMessage::Error { message }),
EngineStreamEvent::Notice(text) => Some(ServerMessage::Notice { text }),
EngineStreamEvent::SubagentStart { .. }
| EngineStreamEvent::SubagentUpdate { .. }
| EngineStreamEvent::SubagentDone { .. }
| EngineStreamEvent::SteeringDelivered { .. }
| EngineStreamEvent::Noop => None,
}
}
async fn handle_command(name: &str, args: &str, state: &Arc<ServerState>) {
let broadcast = &state.broadcast_tx;
if name == "model" && args.is_empty() {
let rt = state.runtime.lock().await;
let _ = broadcast.send(ServerMessage::System {
message: format!("current model: {}", rt.model()),
});
return;
}
if name == "thinking" && args.is_empty() {
let rt = state.runtime.lock().await;
let _ = broadcast.send(ServerMessage::System {
message: format!(
"thinking: {} ({})",
rt.thinking_level(),
rt.thinking_budget()
),
});
return;
}
let engine_result = {
let mut rt = state.runtime.lock().await;
engine_commands::handle_engine_command(name, args, &mut rt)
};
if let Some(result) = engine_result {
match result {
CommandResult::ModelChanged { model } => {
let _ = broadcast.send(ServerMessage::System {
message: format!("model set to: {model}"),
});
}
CommandResult::ThinkingChanged { level, .. } => {
let _ = broadcast.send(ServerMessage::System {
message: format!("thinking set to: {level}"),
});
}
CommandResult::Quit => {
let _ = broadcast.send(ServerMessage::System {
message: "/quit ignored — server is long-lived; close the WebSocket instead"
.to_string(),
});
}
CommandResult::Compact { .. } => {
let _ = broadcast.send(ServerMessage::System {
message: "/compact not yet wired in server mode".to_string(),
});
}
CommandResult::Error(msg) => {
let _ = broadcast.send(ServerMessage::Error { message: msg });
}
other => {
tracing::debug!(?other, "engine command result not handled by server");
}
}
return;
}
match name {
"clear" => {
{
let rt = state.runtime.lock().await;
let mut conv = state.conv.write().await;
conv.clear(&rt).await;
}
state.display_history.write().await.clear();
let _ = broadcast.send(ServerMessage::System {
message: "session cleared".to_string(),
});
}
"system" => {
if args.is_empty() || args == "show" {
let rt = state.runtime.lock().await;
let prompt = rt.system_prompt().unwrap_or("(none)");
let _ = broadcast.send(ServerMessage::System {
message: format!("system prompt: {}", truncate_str(prompt, 200)),
});
} else {
let mut rt = state.runtime.lock().await;
rt.set_system_prompt(args.to_string());
let _ = broadcast.send(ServerMessage::System {
message: "system prompt updated".to_string(),
});
}
}
_ => {
let _ = broadcast.send(ServerMessage::Error {
message: format!("unknown command: {name}"),
});
}
}
}
fn rebuild_history(api_messages: &[serde_json::Value]) -> Vec<HistoryEntry> {
let mut history = Vec::new();
for msg in api_messages {
match msg["role"].as_str() {
Some("user") => {
if let Some(content) = msg["content"].as_str() {
history.push(HistoryEntry::User {
content: content.to_string(),
time: String::new(),
});
}
}
Some("assistant") => {
if let Some(content) = msg["content"].as_array() {
for block in content {
match block["type"].as_str() {
Some("thinking") => {
if let Some(text) = block["thinking"].as_str() {
history.push(HistoryEntry::Thinking {
content: text.to_string(),
time: String::new(),
});
}
}
Some("text") => {
if let Some(text) = block["text"].as_str() {
history.push(HistoryEntry::Text {
content: text.to_string(),
time: String::new(),
});
}
}
Some("tool_use") => {
let name = block["name"].as_str().unwrap_or("").to_string();
let input =
serde_json::to_string(&block["input"]).unwrap_or_default();
history.push(HistoryEntry::ToolUse {
tool_name: name,
input,
time: String::new(),
});
}
_ => {}
}
}
}
}
_ => {}
}
}
history
}