use std::sync::{
Arc,
atomic::{AtomicBool, Ordering},
};
use crate::{
gateway::preparse::{PreparseOrigin, try_preparse_locally},
ws::{
dispatch::{MethodCtx, MethodResult},
types::{ErrorShape, EventFrame},
},
};
use rsclaw_agent::AgentMessage;
use rsclaw_events::AgentEvent;
pub async fn chat_send(ctx: MethodCtx) -> MethodResult {
let params = ctx
.req
.params
.as_ref()
.ok_or_else(|| ErrorShape::bad_request("missing params"))?;
let text = params
.get("message")
.or_else(|| params.get("text"))
.and_then(|v| v.as_str())
.ok_or_else(|| ErrorShape::bad_request("missing required param: message"))?
.to_owned();
let session_key = params
.get("sessionKey")
.or_else(|| params.get("key"))
.and_then(|v| v.as_str())
.map(|s| s.to_owned())
.unwrap_or_else(|| format!("ws:{}", uuid::Uuid::new_v4()));
let agent_id = params
.get("agentId")
.and_then(|v| v.as_str())
.unwrap_or("default");
let run_id = uuid::Uuid::new_v4().to_string();
let agent = if agent_id == "default" {
ctx.state
.agents
.default_agent()
.map_err(|e| ErrorShape::internal(e.to_string()))?
} else {
ctx.state
.agents
.get(agent_id)
.map_err(|e| ErrorShape::not_found(e.to_string()))?
};
let rx = ctx.state.event_bus.subscribe();
let event_tx = ctx.conn.read().await.event_tx.clone();
let conn = ctx.conn.clone();
let sk = session_key.clone();
let rid = run_id.clone();
if let Some(reply) =
try_preparse_locally(&text, &agent, "ws", "ws-client", PreparseOrigin::User).await
{
let _ = ctx.state.event_bus.send(AgentEvent {
session_id: session_key.clone(),
agent_id: agent.id.clone(),
delta: reply.text.clone(),
done: false,
files: vec![],
images: vec![],
tool_log: vec![],
question: None,
channel: None,
});
let _ = ctx.state.event_bus.send(AgentEvent {
session_id: session_key.clone(),
agent_id: agent.id.clone(),
delta: String::new(),
done: true,
files: vec![],
images: reply.images,
tool_log: vec![],
question: None,
channel: None,
});
let inflight_guard = ctx.state.shutdown.begin_work();
let shutdown_for_relay = ctx.state.shutdown.clone();
tokio::spawn(async move {
let _inflight_guard = inflight_guard;
use futures::StreamExt;
let mut stream = tokio_stream::wrappers::BroadcastStream::new(rx);
loop {
tokio::select! {
() = shutdown_for_relay.notified() => break,
next = stream.next() => {
let Some(Ok(event)) = next else { break };
if event.session_id != sk {
continue;
}
let conn_seq = conn.write().await.next_seq();
let payload = if event.done {
serde_json::json!({
"runId": rid,
"sessionKey": sk,
"type": "done",
"role": "assistant",
"files": event.files,
"images": event.images,
"toolLog": event.tool_log,
})
} else {
serde_json::json!({
"runId": rid,
"sessionKey": sk,
"type": "text_delta",
"delta": event.delta,
"role": "assistant",
})
};
let frame = EventFrame::new("chat", payload, conn_seq);
let json = serde_json::to_string(&frame).unwrap_or_default();
if event_tx.send(json).await.is_err() {
break;
}
if event.done {
break;
}
}
}
}
});
return Ok(serde_json::json!({
"runId": run_id,
"sessionKey": session_key,
"status": "preparse",
}));
}
let (text, file_images, file_files) = rsclaw_agent::registry::extract_file_refs(&text);
let (reply_tx, _reply_rx) = tokio::sync::oneshot::channel();
let msg = AgentMessage {
session_key: session_key.clone(),
text,
channel: "ws".to_owned(),
peer_id: "ws-client".to_owned(),
chat_id: String::new(),
reply_tx,
task_id: None,
context_id: None,
event_tx: None,
cancel_token: None,
input_request_tx: None,
extra_tools: vec![],
images: file_images,
files: file_files,
account: None,
};
tracing::info!(
agent_id = %agent.id,
session_key = %session_key,
run_id = %run_id,
tx_capacity = agent.tx.capacity(),
tx_max_capacity = agent.tx.max_capacity(),
"chat.send: dispatching to agent runtime"
);
if let Err(e) = agent.tx.send(msg).await {
tracing::error!(
agent_id = %agent.id,
session_key = %session_key,
error = %e,
"chat.send: agent.tx.send failed"
);
return Err(ErrorShape::internal(e.to_string()));
}
tracing::info!(
agent_id = %agent.id,
session_key = %session_key,
run_id = %run_id,
"chat.send: dispatched ok, waiting for runtime"
);
let inflight_guard = ctx.state.shutdown.begin_work();
let shutdown_for_relay = ctx.state.shutdown.clone();
tokio::spawn(async move {
let _inflight_guard = inflight_guard;
use futures::StreamExt;
let mut stream = tokio_stream::wrappers::BroadcastStream::new(rx);
loop {
tokio::select! {
() = shutdown_for_relay.notified() => break,
next = stream.next() => {
let Some(Ok(event)) = next else { break };
if event.session_id != sk {
continue;
}
let conn_seq = conn.write().await.next_seq();
let payload = if event.done {
serde_json::json!({
"runId": rid,
"sessionKey": sk,
"type": "done",
"role": "assistant",
"files": event.files,
"images": event.images,
"toolLog": event.tool_log,
})
} else {
serde_json::json!({
"runId": rid,
"sessionKey": sk,
"type": "text_delta",
"delta": event.delta,
"role": "assistant",
})
};
let frame = EventFrame::new("chat", payload, conn_seq);
let json = serde_json::to_string(&frame).unwrap_or_default();
if event_tx.send(json).await.is_err() {
break;
}
if event.done {
break;
}
}
}
}
});
Ok(serde_json::json!({
"runId": run_id,
"sessionKey": session_key,
"status": "started"
}))
}
pub async fn chat_inject(ctx: MethodCtx) -> MethodResult {
let params = ctx
.req
.params
.as_ref()
.ok_or_else(|| ErrorShape::bad_request("missing params"))?;
let session_key = params
.get("sessionKey")
.or_else(|| params.get("key"))
.and_then(|v| v.as_str())
.ok_or_else(|| ErrorShape::bad_request("missing required param: sessionKey"))?;
let role = params
.get("role")
.and_then(|v| v.as_str())
.unwrap_or("assistant");
let content = params
.get("content")
.or_else(|| params.get("message"))
.and_then(|v| v.as_str())
.unwrap_or("");
let msg = serde_json::json!({
"role": role,
"content": content,
});
ctx.state
.store
.db
.append_message(session_key, &msg)
.map_err(|e| ErrorShape::internal(e.to_string()))?;
Ok(serde_json::json!({ "ok": true }))
}
pub async fn chat_history(ctx: MethodCtx) -> MethodResult {
let params = ctx
.req
.params
.as_ref()
.ok_or_else(|| ErrorShape::bad_request("missing params"))?;
let sk = params
.get("sessionKey")
.and_then(|v| v.as_str())
.ok_or_else(|| ErrorShape::bad_request("missing required param: sessionKey"))?;
let limit = params.get("limit").and_then(|v| v.as_u64()).unwrap_or(200) as usize;
let all_messages = ctx
.state
.store
.db
.load_messages(sk)
.map_err(|e| ErrorShape::internal(e.to_string()))?;
let filtered: Vec<_> = all_messages
.into_iter()
.filter(|v| !is_compaction_message(v))
.map(rsclaw_provider::redact_rsclaw_hidden_value)
.collect();
let messages: Vec<_> = if filtered.len() > limit {
filtered[filtered.len() - limit..].to_vec()
} else {
filtered
};
Ok(serde_json::json!({
"sessionKey": sk,
"messages": messages
}))
}
pub async fn chat_abort(ctx: MethodCtx) -> MethodResult {
let params = ctx.req.params.as_ref();
let sk = params
.and_then(|p| p.get("sessionKey"))
.and_then(|v| v.as_str())
.unwrap_or("");
for agent in ctx.state.agents.all() {
if let Ok(mut flags) = agent.abort_flags.write() {
let flag = flags
.entry(sk.to_string())
.or_insert_with(|| Arc::new(AtomicBool::new(false)));
flag.store(true, Ordering::SeqCst);
}
if let Ok(tokens) = agent.cancel_tokens.read() {
if let Some(token) = tokens.get(sk) {
token.cancel();
}
}
}
Ok(serde_json::json!({
"aborted": true,
"sessionKey": sk
}))
}
pub async fn chat_permission_response(ctx: MethodCtx) -> MethodResult {
let params = ctx
.req
.params
.as_ref()
.ok_or_else(|| ErrorShape::bad_request("missing params"))?;
let request_id = params
.get("requestId")
.or_else(|| params.get("request_id"))
.and_then(|v| v.as_str())
.ok_or_else(|| ErrorShape::bad_request("missing required param: requestId"))?
.to_owned();
let decision_str = params
.get("decision")
.and_then(|v| v.as_str())
.ok_or_else(|| ErrorShape::bad_request("missing required param: decision"))?;
use rsclaw_computer::permission::{PermissionDecision, PermissionStore as _};
let decision = match decision_str {
"allow_once" => PermissionDecision::AllowOnce,
"allow_session" => PermissionDecision::AllowSession,
"allow_always" => PermissionDecision::AllowAlways,
"deny" => PermissionDecision::Deny,
other => {
return Err(ErrorShape::bad_request(format!(
"invalid decision `{other}` (expected: allow_once, allow_session, allow_always, deny)"
)));
}
};
let agent_id = params
.get("agentId")
.and_then(|v| v.as_str())
.unwrap_or("default");
let app = params.get("app").and_then(|v| v.as_str()).unwrap_or("");
if let Err(e) = ctx
.state
.computer_permission
.record(agent_id, app, decision)
.await
{
tracing::warn!(error = %e, "computer_permission.record failed");
}
let resolved = ctx
.state
.computer_permission
.resolve_pending_request(&request_id, decision)
.await;
Ok(serde_json::json!({
"resolved": resolved,
"requestId": request_id,
}))
}
use rsclaw_agent::compaction::is_compaction_message;