use std::{
collections::HashMap,
sync::{Arc, Mutex},
time::Duration,
};
use anyhow::{Context, Result};
use futures::{StreamExt, TryStreamExt, future::BoxFuture};
use reqwest::StatusCode;
use serde::{Deserialize, Serialize};
use serde_json::{Value, json};
use super::{
AgentEndpoint, ContentPart, LlmProvider, LlmRequest, LlmStream, Message, MessageContent,
RecallMetadata, Role, StreamEvent, TokenUsage,
};
pub const RSCLAW_DEFAULT_BASE: &str = "https://api.rsclaw.ai/v1";
pub const RSCLAW_DEFAULT_PREFIX_ID: &str = "rsclaw/2026.6.18";
pub const RSCLAW_DEFAULT_COMPACT_TIMEOUT_SECS: u64 = 180;
pub const RSCLAW_DEFAULT_FLASH: &str = "rsclaw/rsclaw-flash-v1";
pub const RSCLAW_DEFAULT_VISION: &str = "rsclaw/rsclaw-vision-v1";
const TURN_HEADERS_TIMEOUT: Duration = Duration::from_secs(60);
const STREAM_READ_IDLE_SECS: u64 = 45;
const MAX_SESSIONS: usize = 10_000;
pub struct RsclawProvider {
fleet: rsclaw_embed::FleetHttp,
base_url: String,
bearer: Option<String>,
prefix_id: String,
compact_timeout: Duration,
constrain_tool_calls: bool,
sessions: Arc<Mutex<HashMap<String, SessionEntry>>>,
}
async fn read_sse_terminal_event(resp: reqwest::Response) -> Result<(String, String)> {
let mut stream = resp.bytes_stream();
let mut buf: Vec<u8> = Vec::new();
let mut parser = SseTerminalParser::default();
loop {
let chunk = tokio::time::timeout(Duration::from_secs(STREAM_READ_IDLE_SECS), stream.next())
.await
.map_err(|_| {
anyhow::anyhow!(
"rsclaw replay: SSE idle for {STREAM_READ_IDLE_SECS}s (heartbeats stopped)"
)
})?;
let Some(chunk) = chunk else {
anyhow::bail!("rsclaw replay: SSE stream ended without a terminal event");
};
let chunk = chunk.context("rsclaw replay: SSE read error")?;
buf.extend_from_slice(&chunk);
while let Some(pos) = buf.iter().position(|&b| b == b'\n') {
let line_bytes: Vec<u8> = buf.drain(..=pos).collect();
let line_owned = String::from_utf8_lossy(&line_bytes).into_owned();
if let Some(terminal) = parser.push_line(line_owned.trim_end_matches(['\n', '\r'])) {
return Ok(terminal);
}
}
}
}
#[derive(Default)]
struct SseTerminalParser {
event_name: String,
data: String,
}
impl SseTerminalParser {
fn push_line(&mut self, line: &str) -> Option<(String, String)> {
if line.is_empty() {
if self.event_name == "result" || self.event_name == "error" {
return Some((
std::mem::take(&mut self.event_name),
std::mem::take(&mut self.data),
));
}
self.event_name.clear();
self.data.clear();
return None;
}
if line.starts_with(':') {
return None; }
if let Some(v) = line.strip_prefix("event:") {
self.event_name = v.trim().to_owned();
} else if let Some(v) = line.strip_prefix("data:") {
if !self.data.is_empty() {
self.data.push('\n');
}
self.data.push_str(v.strip_prefix(' ').unwrap_or(v));
}
None
}
}
#[derive(Clone, Debug)]
struct SessionEntry {
session_id: String,
prefix_id: String,
last_seen_msgs_len: usize,
}
impl RsclawProvider {
pub fn new(base_url: impl Into<String>, bearer: Option<String>) -> Self {
Self::with_user_agent(base_url, bearer, None)
}
pub fn with_user_agent(
base_url: impl Into<String>,
bearer: Option<String>,
user_agent: Option<String>,
) -> Self {
let base_url = base_url.into().trim().trim_end_matches('/').to_string();
let base_url = base_url
.strip_suffix("/agent")
.map(|s| s.trim_end_matches('/').to_string())
.unwrap_or(base_url);
let bearer = bearer
.map(|b| b.trim().to_string())
.filter(|b| !b.is_empty());
let client = reqwest::Client::builder()
.user_agent(user_agent.as_deref().unwrap_or(super::DEFAULT_USER_AGENT))
.redirect(reqwest::redirect::Policy::none())
.connect_timeout(Duration::from_secs(30))
.pool_idle_timeout(Duration::from_secs(30))
.tcp_keepalive(Duration::from_secs(30))
.build()
.expect("failed to build rsclaw HTTP client");
Self {
fleet: rsclaw_embed::FleetHttp::from_client(client),
base_url,
bearer,
prefix_id: RSCLAW_DEFAULT_PREFIX_ID.to_owned(),
compact_timeout: Duration::from_secs(RSCLAW_DEFAULT_COMPACT_TIMEOUT_SECS),
constrain_tool_calls: false,
sessions: Arc::new(Mutex::new(HashMap::new())),
}
}
pub fn with_constrain_tool_calls(mut self, enabled: bool) -> Self {
self.constrain_tool_calls = enabled;
self
}
pub fn with_compact_timeout_secs(mut self, secs: u64) -> Self {
if secs > 0 {
self.compact_timeout = Duration::from_secs(secs);
}
self
}
pub fn with_prefix_id(mut self, prefix_id: impl Into<String>) -> Self {
let s = prefix_id.into().trim().to_owned();
if s.is_empty() {
return self;
}
let slash_count = s.matches('/').count();
if slash_count != 1 {
tracing::warn!(
requested_prefix_id = %s,
slash_count,
default_prefix_id = RSCLAW_DEFAULT_PREFIX_ID,
"rsclaw with_prefix_id: ignoring override that violates §2.10.1 \
(need exactly one '/' separator); falling back to default"
);
return self;
}
self.prefix_id = s;
self
}
fn lock_sessions(&self) -> std::sync::MutexGuard<'_, HashMap<String, SessionEntry>> {
match self.sessions.lock() {
Ok(g) => g,
Err(p) => {
use std::sync::OnceLock;
static LOGGED: OnceLock<()> = OnceLock::new();
if LOGGED.set(()).is_ok() {
tracing::error!(
"rsclaw: sessions mutex poisoned — a prior thread \
panicked while holding it. Recovering inner data \
and continuing; expect possible session-state \
drift until restart."
);
}
p.into_inner()
}
}
}
fn lookup_and_bump(
&self,
session_key: &str,
prefix_id: &str,
msgs_len: usize,
) -> Option<SessionEntry> {
let mut map = self.lock_sessions();
let entry = map.get_mut(session_key)?;
if entry.prefix_id != prefix_id {
return None;
}
if msgs_len < entry.last_seen_msgs_len {
return None;
}
entry.last_seen_msgs_len = msgs_len;
Some(entry.clone())
}
fn store(&self, session_key: &str, entry: SessionEntry) {
let mut map = self.lock_sessions();
map.insert(session_key.to_string(), entry);
evict_if_oversized(&mut map);
}
fn forget(&self, session_key: &str) {
let mut map = self.lock_sessions();
map.remove(session_key);
}
}
fn evict_if_oversized(map: &mut HashMap<String, SessionEntry>) {
if map.len() <= MAX_SESSIONS {
return;
}
let target_drop = map.len() - MAX_SESSIONS / 2;
let victims: Vec<String> = map.keys().take(target_drop).cloned().collect();
let dropped = victims.len();
for k in victims {
map.remove(&k);
}
tracing::info!(
cap = MAX_SESSIONS,
dropped,
remaining = map.len(),
"rsclaw: sessions cache over cap, evicted batch"
);
}
#[derive(Debug, PartialEq, Eq)]
enum DispatchRoute {
OneShot(&'static str),
Sessions,
}
fn dispatch_decision(req: &LlmRequest) -> Result<DispatchRoute> {
let bare_model = req.model.strip_prefix("rsclaw/").unwrap_or(&req.model);
let is_flash_model = bare_model.starts_with("rsclaw-flash-");
let is_vision_model = bare_model.starts_with("rsclaw-vision-");
let is_agent_model = bare_model.starts_with("rsclaw-agent-");
if req.kv_cache_mode == 2 && req.session_key.is_none() {
anyhow::bail!(
"rsclaw kv_cache_mode=2 requires session_key (got None); \
set session_key=Some(...) for stateful traffic or \
kv_cache_mode=0 + session_key=None for /oneshot"
);
}
if is_flash_model {
return Ok(DispatchRoute::OneShot("/fastshot"));
}
if is_vision_model {
return Ok(DispatchRoute::OneShot("/vision"));
}
if is_agent_model && req.session_key.is_none() {
return Ok(DispatchRoute::OneShot("/oneshot"));
}
if is_agent_model && !matches!(req.endpoint, AgentEndpoint::Primary) {
tracing::warn!(
model = %req.model,
endpoint = ?req.endpoint,
"rsclaw dispatch: agent-* model overrides endpoint hint; routing to /sessions"
);
}
if !is_agent_model {
if matches!(req.endpoint, AgentEndpoint::Flash) {
return Ok(DispatchRoute::OneShot("/fastshot"));
}
if matches!(req.endpoint, AgentEndpoint::Vision) {
return Ok(DispatchRoute::OneShot("/vision"));
}
}
if req.session_key.is_none() {
return Ok(DispatchRoute::OneShot("/oneshot"));
}
if req.kv_cache_mode != 2 {
anyhow::bail!(
"rsclaw session-mode call requires kv_cache_mode=2 (got {}); \
pass session_key=None to route to /oneshot instead",
req.kv_cache_mode
);
}
Ok(DispatchRoute::Sessions)
}
impl LlmProvider for RsclawProvider {
fn name(&self) -> &str {
"rsclaw"
}
fn stream(&self, mut req: LlmRequest) -> BoxFuture<'_, Result<LlmStream>> {
Box::pin(async move {
match dispatch_decision(&req)? {
DispatchRoute::OneShot(path) => return self.stream_oneshot(req, path).await,
DispatchRoute::Sessions => { }
}
let session_key = req
.session_key
.clone()
.context("rsclaw kv_cache_mode=2 requires session_key on the request")?;
normalize_trailing_system(&mut req.messages);
let split = split_request(&req, &self.prefix_id, self.constrain_tool_calls)?;
let entry =
match self.lookup_and_bump(&session_key, &split.prefix_id, req.messages.len()) {
Some(e) => e,
None => {
self.forget(&session_key);
let history = history_for_replay(&req.messages);
let resp = if history.is_empty() {
self.open(&split).await?
} else {
self.replay(&split, history).await?
};
let entry = SessionEntry {
session_id: resp.session_id.clone(),
prefix_id: split.prefix_id.clone(),
last_seen_msgs_len: req.messages.len(),
};
self.store(&session_key, entry.clone());
entry
}
};
let delta = TurnDelta::from_request(&req)?;
if std::env::var("RSCLAW_DUMP_TURN").is_ok() {
dump_turn_for_debug(&session_key, &entry, &split, &delta, &req);
}
let resp = match self.turn(&entry.session_id, &delta, &req).await {
Ok(o) => o,
Err(e) => {
self.forget(&session_key);
return Err(e);
}
};
let resp = match resp {
TurnOutcome::Stream(s) => s,
TurnOutcome::SessionNotFound => {
self.forget(&session_key);
let replay_history = history_for_replay(&req.messages);
let replayed = self.replay(&split, replay_history).await?;
let entry = SessionEntry {
session_id: replayed.session_id.clone(),
prefix_id: split.prefix_id.clone(),
last_seen_msgs_len: req.messages.len(),
};
self.store(&session_key, entry.clone());
match self.turn(&entry.session_id, &delta, &req).await {
Ok(TurnOutcome::Stream(s)) => s,
Ok(TurnOutcome::SessionNotFound) => {
self.forget(&session_key);
anyhow::bail!(
"rsclaw: session vanished immediately after replay (id={})",
entry.session_id
);
}
Err(e) => {
self.forget(&session_key);
return Err(e);
}
}
}
};
Ok(invalidate_on_error(
resp,
Arc::clone(&self.sessions),
session_key,
))
})
}
fn compact_splice<'a>(
&'a self,
session_key: &'a str,
keep_head_messages: usize,
summary: &'a str,
keep_tail_messages: usize,
expected_msgs_count: Option<usize>,
) -> BoxFuture<'a, Result<usize>> {
Box::pin(async move {
let session_id = {
let map = self.lock_sessions();
map.get(session_key)
.map(|e| e.session_id.clone())
.ok_or_else(|| {
anyhow::anyhow!(
"rsclaw compact splice: no cached session for key {session_key} — \
falling back to replay"
)
})?
};
const MAX_SPLICE_ATTEMPTS: usize = 3;
let mut expected = expected_msgs_count;
let mut keep_tail = keep_tail_messages;
let mut attempt = 0usize;
let resp = loop {
attempt += 1;
match self
.compact_splice_inner(
&session_id,
keep_head_messages,
summary,
keep_tail,
expected,
)
.await?
{
SpliceOutcome::Done(r) => break r,
SpliceOutcome::CountMismatch { current } => {
let prev = expected.unwrap_or(current);
if current < prev {
anyhow::bail!(
"rsclaw compact splice: server count {current} < local \
{prev}; falling back to replay rather than dropping \
unsummarized messages"
);
}
keep_tail += current - prev;
expected = Some(current);
if keep_head_messages + keep_tail >= current {
anyhow::bail!(
"rsclaw compact splice: realigned keep ranges (head \
{keep_head_messages} + tail {keep_tail}) leave no middle \
to drop against server count {current}; falling back to \
replay"
);
}
if attempt >= MAX_SPLICE_ATTEMPTS {
anyhow::bail!(
"rsclaw compact splice: still 409 msg_count_mismatch \
after {attempt} attempts (count drifting under \
concurrent load); falling back to replay"
);
}
tracing::info!(
session_key,
current,
new_keep_tail = keep_tail,
attempt,
"rsclaw compact splice: 409 msg_count_mismatch — realigned \
expected to server count and retrying"
);
}
}
};
let local_msgs_count = keep_head_messages + 1 + keep_tail;
if resp.msgs_count != local_msgs_count {
tracing::warn!(
session_key,
server_count = resp.msgs_count,
local_count = local_msgs_count,
"rsclaw compact splice: server msgs_count diverges from gateway computation"
);
}
tracing::info!(
session_key,
msgs_count = resp.msgs_count,
tokens_count = resp.tokens_count,
"rsclaw compact splice: server-side splice complete"
);
{
let mut map = self.lock_sessions();
if let Some(entry) = map.get_mut(session_key) {
entry.last_seen_msgs_len = local_msgs_count;
}
}
Ok(resp.msgs_count)
})
}
}
fn invalidate_on_error(
inner: LlmStream,
sessions: Arc<Mutex<HashMap<String, SessionEntry>>>,
session_key: String,
) -> LlmStream {
use futures::StreamExt;
let mut errored = false;
Box::pin(inner.inspect(move |item| {
if errored {
return;
}
let invalidate = match item {
Err(_) => true,
Ok(StreamEvent::Error(_)) => true,
_ => false,
};
if !invalidate {
return;
}
errored = true;
match sessions.lock() {
Ok(mut map) => {
map.remove(&session_key);
}
Err(p) => {
p.into_inner().remove(&session_key);
}
}
}))
}
impl RsclawProvider {
async fn send_following_redirects<B: Serialize>(
&self,
path: &str,
body: &B,
builder_timeout: Option<Duration>,
idempotency_key: Option<&str>,
accept_sse: bool,
) -> Result<reqwest::Response> {
let url = format!("{}/agent{}", self.base_url, path);
self.fleet
.post_following_redirects(
&url,
body,
self.bearer.as_deref(),
accept_sse,
idempotency_key,
builder_timeout,
)
.await
}
async fn open(&self, split: &SplitRequest<'_>) -> Result<CreateSessionResp> {
let (prefix_id, dynamic_prefix, top_level_user_tools) = prefix_fields(
&split.prefix_id,
DynamicPrefixWire {
system: split.dynamic_system,
tools: &split.dynamic_tools,
user_tools: &split.dynamic_user_tools,
user_system: split.dynamic_user_system,
},
);
let body = CreateSessionReq {
prefix_id,
model: &split.model,
dynamic_prefix,
user_tools: top_level_user_tools,
options: Some(split.options.clone()),
};
let resp = self
.send_following_redirects(
"/sessions",
&body,
Some(Duration::from_secs(180)),
None,
false,
)
.await?;
let status = resp.status();
if !status.is_success() {
let body = resp.text().await.unwrap_or_default();
anyhow::bail!("rsclaw open session failed {status}: {body}");
}
resp.json::<CreateSessionResp>()
.await
.context("rsclaw open: parse response")
}
async fn replay(
&self,
split: &SplitRequest<'_>,
messages: &[Message],
) -> Result<CreateSessionResp> {
let (filtered, extra_suffix) = split_system_messages(messages);
let user_system_owned: String = if extra_suffix.is_empty() {
String::new()
} else if split.dynamic_user_system.is_empty() {
extra_suffix
} else {
format!("{}\n\n{}", split.dynamic_user_system, extra_suffix)
};
let user_system: &str = if user_system_owned.is_empty() {
split.dynamic_user_system
} else {
&user_system_owned
};
let history: Vec<Value> = serialize_replay_history(&filtered);
let (prefix_id, dynamic_prefix, top_level_user_tools) = prefix_fields(
&split.prefix_id,
DynamicPrefixWire {
system: split.dynamic_system,
tools: &split.dynamic_tools,
user_tools: &split.dynamic_user_tools,
user_system,
},
);
let body = ReplayReq {
prefix_id,
model: &split.model,
dynamic_prefix,
user_tools: top_level_user_tools,
history,
options: Some(split.options.clone()),
};
let idem_key = uuid::Uuid::new_v4().to_string();
let resp = tokio::time::timeout(
Duration::from_secs(60),
self.send_following_redirects("/sessions/replay", &body, None, Some(&idem_key), true),
)
.await
.map_err(|_| anyhow::anyhow!("rsclaw replay: no response headers within 60s"))??;
let status = resp.status();
let is_sse = resp
.headers()
.get(reqwest::header::CONTENT_TYPE)
.and_then(|v| v.to_str().ok())
.map(|v| v.contains("text/event-stream"))
.unwrap_or(false);
if !is_sse {
if !status.is_success() {
let body = resp.text().await.unwrap_or_default();
anyhow::bail!("rsclaw replay failed {status}: {body}");
}
return tokio::time::timeout(
Duration::from_secs(300),
resp.json::<CreateSessionResp>(),
)
.await
.map_err(|_| anyhow::anyhow!("rsclaw replay: body read timed out after 300s"))?
.context("rsclaw replay: parse response");
}
let (event, data) =
tokio::time::timeout(Duration::from_secs(30 * 60), read_sse_terminal_event(resp))
.await
.map_err(|_| {
anyhow::anyhow!("rsclaw replay: SSE total deadline (30min) exceeded")
})??;
if event == "result" {
return serde_json::from_str::<CreateSessionResp>(&data)
.context("rsclaw replay: parse SSE result event");
}
let parsed: Value = serde_json::from_str(&data).unwrap_or(Value::Null);
let code = parsed.get("status").and_then(Value::as_u64).unwrap_or(0);
let detail = parsed.get("body").map(Value::to_string).unwrap_or(data);
anyhow::bail!("rsclaw replay failed {code}: {detail}");
}
async fn compact_splice_inner(
&self,
session_id: &str,
keep_head_messages: usize,
summary: &str,
keep_tail_messages: usize,
expected_msgs_count: Option<usize>,
) -> Result<SpliceOutcome> {
let path = format!("/sessions/{}/compact", session_id);
let body = CompactSpliceReq {
keep_head_messages,
summary,
keep_tail_messages,
expected_msgs_count,
};
let resp = self
.send_following_redirects(&path, &body, Some(self.compact_timeout), None, false)
.await?;
let status = resp.status();
if status.as_u16() == 409 {
let text = resp.text().await.unwrap_or_default();
let parsed: CompactSplice409 = serde_json::from_str(&text)
.with_context(|| format!("rsclaw compact: parse 409 body: {text}"))?;
return Ok(SpliceOutcome::CountMismatch {
current: parsed.error.current,
});
}
if !status.is_success() {
let body = resp.text().await.unwrap_or_default();
anyhow::bail!("rsclaw compact splice failed {status}: {body}");
}
let resp = resp
.json::<CompactSpliceResp>()
.await
.context("rsclaw compact: parse response")?;
Ok(SpliceOutcome::Done(resp))
}
async fn stream_oneshot(&self, req: LlmRequest, path: &'static str) -> Result<LlmStream> {
use futures::StreamExt;
let prompt = flatten_prompt_for_oneshot(&req);
if prompt.trim().is_empty() {
anyhow::bail!("rsclaw {path}: empty prompt after flattening req.messages");
}
let mut body = serde_json::Map::new();
body.insert("prompt".to_owned(), Value::String(prompt));
let use_stream = path != "/vision";
body.insert("stream".to_owned(), Value::Bool(use_stream));
if let Some(mt) = req.max_tokens {
body.insert("max_tokens".to_owned(), Value::from(mt));
}
let canonical_model = match path {
"/fastshot" => "rsclaw-flash-v1",
"/vision" => "rsclaw-vision-v1",
_ => "rsclaw-agent-v1", };
body.insert(
"model".to_owned(),
Value::String(canonical_model.to_owned()),
);
let mut options = serde_json::Map::new();
if let Some(t) = req.temperature {
options.insert("temperature".to_owned(), super::json_f32(t));
}
if let Some(budget) = req.thinking_budget {
options.insert("enable_thinking".to_owned(), Value::Bool(budget > 0));
}
if !options.is_empty() {
body.insert("options".to_owned(), Value::Object(options));
}
if path == "/vision" {
let images = extract_images_for_oneshot(&req);
if images.is_empty() {
anyhow::bail!("rsclaw /vision: request has no image content");
}
body.insert(
"images".to_owned(),
Value::Array(images.into_iter().map(Value::String).collect()),
);
}
let body = Value::Object(body);
let send_fut = self.send_following_redirects(path, &body, None, None, false);
let resp = match tokio::time::timeout(TURN_HEADERS_TIMEOUT, send_fut).await {
Ok(r) => r?,
Err(_) => anyhow::bail!(
"rsclaw {path}: timed out waiting for response headers after {}s ({}/agent{})",
TURN_HEADERS_TIMEOUT.as_secs(),
self.base_url,
path,
),
};
let status = resp.status();
if !status.is_success() {
let body = resp.text().await.unwrap_or_default();
anyhow::bail!("rsclaw {path} failed {status}: {body}");
}
if !use_stream {
let json: Value = resp
.json()
.await
.map_err(|e| anyhow::anyhow!("rsclaw {path}: decode non-stream body: {e}"))?;
if let Some(err) = json.get("error").and_then(|v| v.as_str()) {
return Ok(Box::pin(futures::stream::iter(vec![Ok(StreamEvent::Error(
err.to_owned(),
))])) as LlmStream);
}
let content = json
.get("content")
.and_then(|v| v.as_str())
.or_else(|| json.get("text").and_then(|v| v.as_str()))
.or_else(|| {
json.pointer("/choices/0/message/content")
.and_then(|v| v.as_str())
})
.unwrap_or_default()
.to_owned();
let events = vec![
Ok(StreamEvent::TextDelta(content)),
Ok(StreamEvent::Done { usage: None }),
];
return Ok(Box::pin(futures::stream::iter(events)) as LlmStream);
}
let path_owned = path.to_string();
let byte_stream = tokio_stream::StreamExt::timeout(
resp.bytes_stream(),
Duration::from_secs(STREAM_READ_IDLE_SECS),
)
.map(move |r| match r {
Ok(Ok(bytes)) => Ok(bytes),
Ok(Err(e)) => Err(anyhow::anyhow!("stream read error: {e}")),
Err(_) => Err(anyhow::anyhow!(
"rsclaw {path_owned}: SSE idle for {STREAM_READ_IDLE_SECS}s (worker stalled mid-generation)"
)),
});
let line_buffer = Arc::new(tokio::sync::Mutex::new(String::new()));
let utf8_remainder = Arc::new(tokio::sync::Mutex::new(Vec::<u8>::new()));
let event_stream = byte_stream
.then(move |chunk| {
let line_buffer = line_buffer.clone();
let utf8_remainder = utf8_remainder.clone();
async move { parse_oneshot_sse_chunk(chunk, &line_buffer, &utf8_remainder).await }
})
.flat_map(|events| futures::stream::iter(events));
Ok(Box::pin(event_stream) as LlmStream)
}
async fn turn(
&self,
session_id: &str,
delta: &TurnDelta,
req: &LlmRequest,
) -> Result<TurnOutcome> {
let path = format!("/sessions/{}/turn", session_id);
let recall = req.recall.as_ref().filter(|r| !r.context.trim().is_empty());
let body = TurnReq {
delta,
recall_context: recall.map(|r| r.context.as_str()),
recall: recall.map(|r| &r.metadata),
options: Some(TurnOptions::from_request(req, self.constrain_tool_calls)),
stream: true,
};
const RETRY_BACKOFFS: [Duration; 2] = [Duration::from_millis(500), Duration::from_secs(2)];
let mut attempt: usize = 0;
let resp = loop {
let send_fut = self.send_following_redirects(&path, &body, None, None, false);
let resp = match tokio::time::timeout(TURN_HEADERS_TIMEOUT, send_fut).await {
Ok(r) => r?,
Err(_) => anyhow::bail!(
"rsclaw turn: timed out waiting for response headers after {}s ({}/agent{})",
TURN_HEADERS_TIMEOUT.as_secs(),
self.base_url,
path,
),
};
let status = resp.status();
if status.is_success() {
break resp;
}
let body_text = resp.text().await.unwrap_or_default();
if is_session_evicted(status, &body_text) {
return Ok(TurnOutcome::SessionNotFound);
}
if status == StatusCode::SERVICE_UNAVAILABLE && attempt < RETRY_BACKOFFS.len() {
let delay = RETRY_BACKOFFS[attempt];
tracing::warn!(
status = %status,
attempt = attempt + 1,
delay_ms = delay.as_millis() as u64,
"rsclaw turn: 503 from upstream after server-side auto-replay; retrying"
);
tokio::time::sleep(delay).await;
attempt += 1;
continue;
}
anyhow::bail!("rsclaw turn failed {status}: {body_text}");
};
let byte_stream = resp.bytes_stream();
let line_buffer = Arc::new(tokio::sync::Mutex::new(String::new()));
let utf8_remainder = Arc::new(tokio::sync::Mutex::new(Vec::<u8>::new()));
let block_state = Arc::new(tokio::sync::Mutex::new(SseState::default()));
let event_stream =
byte_stream
.map_err(|e| anyhow::anyhow!("stream read error: {e}"))
.then(move |chunk| {
let line_buffer = line_buffer.clone();
let utf8_remainder = utf8_remainder.clone();
let block_state = block_state.clone();
async move {
parse_sse_chunk(chunk, &line_buffer, &utf8_remainder, &block_state).await
}
})
.flat_map(futures::stream::iter);
Ok(TurnOutcome::Stream(Box::pin(event_stream)))
}
}
enum TurnOutcome {
Stream(LlmStream),
SessionNotFound,
}
#[derive(Debug, Serialize)]
struct DynamicPrefixWire<'a> {
#[serde(skip_serializing_if = "str::is_empty")]
system: &'a str,
tools: &'a [Value],
#[serde(skip_serializing_if = "slice_ref_is_empty")]
user_tools: &'a [Value],
#[serde(skip_serializing_if = "str::is_empty")]
user_system: &'a str,
}
fn slice_ref_is_empty<T>(s: &&[T]) -> bool {
s.is_empty()
}
#[derive(Debug, Serialize)]
struct CreateSessionReq<'a> {
#[serde(skip_serializing_if = "Option::is_none")]
prefix_id: Option<&'a str>,
model: &'a str,
#[serde(skip_serializing_if = "Option::is_none")]
dynamic_prefix: Option<DynamicPrefixWire<'a>>,
#[serde(skip_serializing_if = "slice_ref_is_empty")]
user_tools: &'a [Value],
#[serde(skip_serializing_if = "Option::is_none")]
options: Option<TurnOptions>,
}
#[derive(Debug, Serialize)]
struct ReplayReq<'a> {
#[serde(skip_serializing_if = "Option::is_none")]
prefix_id: Option<&'a str>,
model: &'a str,
#[serde(skip_serializing_if = "Option::is_none")]
dynamic_prefix: Option<DynamicPrefixWire<'a>>,
#[serde(skip_serializing_if = "slice_ref_is_empty")]
user_tools: &'a [Value],
history: Vec<Value>,
#[serde(skip_serializing_if = "Option::is_none")]
options: Option<TurnOptions>,
}
fn prefix_fields<'a>(
prefix_id: &'a str,
dynamic: DynamicPrefixWire<'a>,
) -> (Option<&'a str>, Option<DynamicPrefixWire<'a>>, &'a [Value]) {
if prefix_id.is_empty() {
(None, Some(dynamic), &[])
} else {
let top_level_user_tools = dynamic.user_tools;
(Some(prefix_id), None, top_level_user_tools)
}
}
#[derive(Debug, Serialize)]
struct CompactSpliceReq<'a> {
keep_head_messages: usize,
summary: &'a str,
keep_tail_messages: usize,
#[serde(skip_serializing_if = "Option::is_none")]
expected_msgs_count: Option<usize>,
}
#[derive(Debug, Deserialize, Clone)]
struct CompactSpliceResp {
#[allow(dead_code)] session_id: String,
msgs_count: usize,
tokens_count: usize,
}
enum SpliceOutcome {
Done(CompactSpliceResp),
CountMismatch { current: usize },
}
#[derive(Debug, Deserialize)]
struct CompactSplice409 {
error: CompactSplice409Error,
}
#[derive(Debug, Deserialize)]
struct CompactSplice409Error {
current: usize,
}
#[derive(Debug, Deserialize, Clone)]
struct CreateSessionResp {
session_id: String,
#[serde(default)]
#[allow(dead_code)]
prefix_id: Option<String>,
}
#[derive(Debug, Serialize)]
struct TurnReq<'a> {
#[serde(flatten)]
delta: &'a TurnDelta,
#[serde(skip_serializing_if = "Option::is_none")]
recall_context: Option<&'a str>,
#[serde(skip_serializing_if = "Option::is_none")]
recall: Option<&'a RecallMetadata>,
#[serde(skip_serializing_if = "Option::is_none")]
options: Option<TurnOptions>,
stream: bool,
}
#[derive(Debug, Serialize)]
#[serde(untagged)]
enum TurnDelta {
User { user_message: String },
Tools { tool_results: Vec<ToolResultDelta> },
}
impl TurnDelta {
fn from_request(req: &LlmRequest) -> Result<Self> {
let last = req
.messages
.last()
.context("rsclaw: empty messages, no delta to send")?;
if !matches!(last.role, Role::User | Role::Tool) {
anyhow::bail!(
"rsclaw: last message must be User or Tool, got {:?}",
last.role
);
}
if matches!(last.role, Role::Tool) {
let mut tail: Vec<&Message> = Vec::new();
for m in req.messages.iter().rev() {
if matches!(m.role, Role::Tool) {
tail.push(m);
} else {
break;
}
}
tail.reverse();
let mut tool_results: Vec<ToolResultDelta> = Vec::new();
for m in tail {
if let MessageContent::Parts(parts) = &m.content {
for p in parts {
if let ContentPart::ToolResult {
tool_use_id,
content,
is_error,
} = p
{
tool_results.push(ToolResultDelta {
tool_use_id: tool_use_id.clone(),
content: content.clone(),
is_error: is_error.unwrap_or(false),
});
}
}
}
}
if tool_results.is_empty() {
anyhow::bail!("rsclaw: trailing Tool message(s) carried no tool_result parts");
}
return Ok(TurnDelta::Tools { tool_results });
}
let mut user_text = String::new();
match &last.content {
MessageContent::Text(t) => user_text.push_str(t),
MessageContent::Parts(parts) => {
for p in parts {
if let ContentPart::Text { text } = p {
user_text.push_str(text);
}
}
}
}
if user_text.is_empty() {
anyhow::bail!("rsclaw: last message has no usable content for delta")
}
Ok(TurnDelta::User {
user_message: user_text,
})
}
}
#[derive(Debug, Serialize)]
struct ToolResultDelta {
tool_use_id: String,
content: String,
#[serde(default)]
is_error: bool,
}
fn ser_opt_f32<S: serde::Serializer>(
v: &Option<f32>,
s: S,
) -> std::result::Result<S::Ok, S::Error> {
match v {
None => s.serialize_none(),
Some(f) => super::json_f32(*f).serialize(s),
}
}
#[derive(Debug, Serialize, Clone, Default)]
struct TurnOptions {
#[serde(skip_serializing_if = "Option::is_none")]
max_tokens: Option<u32>,
#[serde(
skip_serializing_if = "Option::is_none",
serialize_with = "ser_opt_f32"
)]
temperature: Option<f32>,
#[serde(
skip_serializing_if = "Option::is_none",
serialize_with = "ser_opt_f32"
)]
top_p: Option<f32>,
#[serde(skip_serializing_if = "Option::is_none")]
enable_thinking: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
stop: Option<Vec<String>>,
#[serde(skip_serializing_if = "Option::is_none")]
idle_ttl_secs: Option<u32>,
#[serde(skip_serializing_if = "Option::is_none")]
constrain_tool_calls: Option<bool>,
}
impl TurnOptions {
fn from_request(req: &LlmRequest, constrain_tool_calls: bool) -> Self {
Self {
max_tokens: req.max_tokens,
temperature: req.temperature,
top_p: None,
enable_thinking: req.thinking_budget.map(|b| b > 0),
stop: None,
idle_ttl_secs: None,
constrain_tool_calls: (constrain_tool_calls && !req.tools.is_empty()).then_some(true),
}
}
}
struct SplitRequest<'a> {
prefix_id: String,
model: String,
dynamic_system: &'a str,
dynamic_user_system: &'a str,
dynamic_tools: Vec<Value>,
dynamic_user_tools: Vec<Value>,
options: TurnOptions,
}
fn dump_turn_for_debug(
session_key: &str,
entry: &SessionEntry,
split: &SplitRequest<'_>,
delta: &TurnDelta,
req: &LlmRequest,
) {
use std::time::{SystemTime, UNIX_EPOCH};
let now_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
let history_owned: Vec<&Message> = history_for_replay(&req.messages).iter().collect();
let history_values = serialize_replay_history(&history_owned);
let openai_model = req.model.strip_prefix("rsclaw/").unwrap_or(&req.model);
let openai_messages: Vec<Value> = req
.messages
.iter()
.filter_map(|m| serde_json::to_value(m).ok())
.collect();
let openai_tools: Vec<Value> = req
.tools
.iter()
.map(|t| {
json!({
"type": "function",
"function": {
"name": t.name,
"description": t.description,
"parameters": t.parameters,
}
})
})
.collect();
let opts = split.options.clone();
let recall = req.recall.as_ref().filter(|r| !r.context.trim().is_empty());
let turn_body = to_canonical_value(
serde_json::to_value(&TurnReq {
delta,
recall_context: recall.map(|r| r.context.as_str()),
recall: recall.map(|r| &r.metadata),
options: Some(opts.clone()),
stream: true,
})
.unwrap_or(Value::Null),
);
let mut dynamic_prefix_dump = serde_json::Map::new();
dynamic_prefix_dump.insert("system".to_owned(), json!(split.dynamic_system));
dynamic_prefix_dump.insert("tools".to_owned(), json!(split.dynamic_tools));
if !split.dynamic_user_tools.is_empty() {
dynamic_prefix_dump.insert("user_tools".to_owned(), json!(split.dynamic_user_tools));
}
if !split.dynamic_user_system.is_empty() {
dynamic_prefix_dump.insert("user_system".to_owned(), json!(split.dynamic_user_system));
}
let replay_body = to_canonical_value(json!({
"prefix_id": split.prefix_id,
"dynamic_prefix": Value::Object(dynamic_prefix_dump),
"history": history_values,
"options": serde_json::to_value(&opts).unwrap_or(Value::Null),
}));
let dump = json!({
"schema_version": 1,
"timestamp_ms": now_ms,
"session_key": session_key,
"model": req.model,
"rsclaw_session": {
"session_id": entry.session_id,
"prefix_id": entry.prefix_id,
"last_seen_msgs_len": entry.last_seen_msgs_len,
},
"llm_request_summary": {
"msg_count": req.messages.len(),
"tool_count": req.tools.len(),
"system_len": req.system.as_deref().map(|s| s.len()).unwrap_or(0),
"max_tokens": req.max_tokens,
"temperature": req.temperature,
"kv_cache_mode": req.kv_cache_mode,
},
"rsclaw_turn_body": turn_body,
"rsclaw_replay_body": replay_body,
"openai_chat_completions_body": {
"model": openai_model,
"messages": openai_messages,
"tools": openai_tools,
"temperature": req.temperature,
"max_tokens": req.max_tokens,
"stream": true,
},
"replay_instructions": [
"Pick ONE of the three replay paths and POST against the worker:",
" A. Stateful turn against a LIVE session (only works while session is alive):",
" curl -X POST $BASE/sessions/<session_id>/turn -d @<this-file>[.rsclaw_turn_body]",
" B. Re-hydrate then turn — recreates session deterministically:",
" curl -X POST $BASE/sessions/replay -d @<this-file>[.rsclaw_replay_body]",
" curl -X POST $BASE/sessions/<new_session_id>/turn -d @<this-file>[.rsclaw_turn_body]",
" C. Stateless OpenAI-compat for comparison (no session, full history each time):",
" curl -X POST $BASE/v1/chat/completions -d @<this-file>[.openai_chat_completions_body]"
]
});
let sess_suffix: String = entry
.session_id
.rsplit('_')
.next()
.unwrap_or("unknown")
.chars()
.take(8)
.collect();
let dir = rsclaw_config::loader::base_dir().join("debug");
if let Err(e) = std::fs::create_dir_all(&dir) {
tracing::warn!(error = %e, "RSCLAW_DUMP_TURN: create_dir_all failed");
return;
}
let path = dir.join(format!("turn-{now_ms}-{sess_suffix}.json"));
match serde_json::to_string_pretty(&dump) {
Ok(s) => match std::fs::write(&path, s) {
Ok(_) => tracing::info!(path = %path.display(), "RSCLAW_DUMP_TURN: turn dumped"),
Err(e) => {
tracing::warn!(error = %e, path = %path.display(), "RSCLAW_DUMP_TURN: write failed")
}
},
Err(e) => tracing::warn!(error = %e, "RSCLAW_DUMP_TURN: serialize failed"),
}
}
fn to_canonical_value(v: serde_json::Value) -> serde_json::Value {
use std::collections::BTreeMap;
match v {
serde_json::Value::Object(map) => {
let sorted: BTreeMap<String, serde_json::Value> = map
.into_iter()
.map(|(k, v)| (k, to_canonical_value(v)))
.collect();
let canon: serde_json::Map<String, serde_json::Value> = sorted.into_iter().collect();
serde_json::Value::Object(canon)
}
serde_json::Value::Array(arr) => {
serde_json::Value::Array(arr.into_iter().map(to_canonical_value).collect())
}
other => other,
}
}
fn split_request<'a>(
req: &'a LlmRequest,
prefix_id: &str,
constrain_tool_calls: bool,
) -> Result<SplitRequest<'a>> {
let prefix_id = prefix_id.to_owned();
let model = req
.model
.strip_prefix("rsclaw/")
.unwrap_or(req.model.as_str())
.to_owned();
if model.is_empty() {
anyhow::bail!("rsclaw: req.model is empty; cannot open session without a model id");
}
let tool_json = |t: &super::ToolDef| {
to_canonical_value(json!({
"name": t.name,
"description": t.description,
"input_schema": t.parameters,
}))
};
let mut dynamic_tools: Vec<Value> = Vec::new();
let mut dynamic_user_tools: Vec<Value> = Vec::new();
for t in &req.tools {
if rsclaw_types::BUILTIN_TOOL_NAMES.contains(&t.name.as_str()) {
dynamic_tools.push(tool_json(t));
} else {
dynamic_user_tools.push(tool_json(t));
}
}
let (dynamic_system, dynamic_user_system) =
if req.system_shared.is_some() || req.user_system.is_some() {
(
req.system_shared.as_deref().unwrap_or(""),
req.user_system.as_deref().unwrap_or(""),
)
} else {
(req.system.as_deref().unwrap_or(""), "")
};
Ok(SplitRequest {
prefix_id,
model,
dynamic_system,
dynamic_user_system,
dynamic_tools,
dynamic_user_tools,
options: TurnOptions::from_request(req, constrain_tool_calls),
})
}
fn flatten_prompt_for_oneshot(req: &LlmRequest) -> String {
let mut parts: Vec<String> = Vec::new();
if let Some(sys) = req.system.as_deref() {
let trimmed = sys.trim();
if !trimmed.is_empty() {
parts.push(trimmed.to_owned());
}
}
for msg in &req.messages {
match &msg.content {
MessageContent::Text(t) => {
let trimmed = t.trim();
if !trimmed.is_empty() {
parts.push(trimmed.to_owned());
}
}
MessageContent::Parts(content_parts) => {
for p in content_parts {
if let ContentPart::Text { text } = p {
let trimmed = text.trim();
if !trimmed.is_empty() {
parts.push(trimmed.to_owned());
}
}
}
}
}
}
parts.join("\n\n")
}
async fn parse_oneshot_sse_chunk(
chunk: anyhow::Result<bytes::Bytes>,
line_buffer: &tokio::sync::Mutex<String>,
utf8_remainder: &tokio::sync::Mutex<Vec<u8>>,
) -> Vec<anyhow::Result<StreamEvent>> {
let bytes = match chunk {
Ok(b) => b,
Err(e) => return vec![Err(e)],
};
let mut remainder = utf8_remainder.lock().await;
let combined = if remainder.is_empty() {
bytes.to_vec()
} else {
let mut c = std::mem::take(&mut *remainder);
c.extend_from_slice(&bytes);
c
};
let text: String = match std::str::from_utf8(&combined) {
Ok(t) => {
drop(remainder);
t.to_owned()
}
Err(e) => {
let valid_up_to = e.valid_up_to();
*remainder = combined[valid_up_to..].to_vec();
drop(remainder);
if valid_up_to == 0 {
return Vec::new();
}
unsafe { std::str::from_utf8_unchecked(&combined[..valid_up_to]) }.to_owned()
}
};
let mut buffer = line_buffer.lock().await;
buffer.push_str(&text);
let Some(last_newline) = buffer.rfind('\n') else {
return Vec::new();
};
let complete = buffer[..last_newline].to_owned();
let leftover = buffer[last_newline + 1..].to_owned();
buffer.clear();
buffer.push_str(&leftover);
drop(buffer);
let mut events: Vec<anyhow::Result<StreamEvent>> = Vec::new();
for line in complete.lines() {
let Some(payload) = line.strip_prefix("data:") else {
continue;
};
let payload = payload.trim_start();
if payload.is_empty() {
continue;
}
if payload == "[DONE]" {
continue;
}
let val: Value = match serde_json::from_str(payload) {
Ok(v) => v,
Err(_) => {
tracing::debug!(payload, "rsclaw fastshot: ignoring unparseable SSE line");
continue;
}
};
let ty = val.get("type").and_then(Value::as_str).unwrap_or("");
match ty {
"delta" => {
if let Some(content) = val.get("content").and_then(Value::as_str) {
if !content.is_empty() {
events.push(Ok(StreamEvent::TextDelta(content.to_owned())));
}
}
}
"block_delta" => {
let piece = val
.get("delta")
.and_then(Value::as_str)
.or_else(|| val.get("content").and_then(Value::as_str))
.or_else(|| val.get("text").and_then(Value::as_str));
if let Some(t) = piece {
if !t.is_empty() {
events.push(Ok(StreamEvent::TextDelta(t.to_owned())));
}
}
}
"start" | "block_start" | "block_stop" | "ping" => {}
"done" => {
let usage = val
.get("usage")
.and_then(Value::as_object)
.map(|u| TokenUsage {
input: extract_usage_count(u, &["input_tokens", "prompt_tokens", "input"]),
output: extract_usage_count(
u,
&["output_tokens", "completion_tokens", "output"],
),
cache_creation: extract_usage_count(
u,
&["cache_creation_input_tokens", "cache_creation_tokens"],
),
cache_read: extract_usage_count(
u,
&[
"cache_read_input_tokens",
"cached_tokens",
"cache_read_tokens",
],
),
recall_tokens: extract_usage_count(u, &["recall_tokens"]),
recall_doc_ids: extract_usage_string_array(u, "recall_doc_ids"),
recall_hash: u
.get("recall_hash")
.and_then(Value::as_str)
.map(str::to_owned),
recall_truncated: u
.get("recall_truncated")
.and_then(Value::as_bool)
.unwrap_or(false),
});
events.push(Ok(StreamEvent::Done { usage }));
}
"error" => {
let err = val.get("error");
let code = err
.and_then(|e| e.get("code"))
.and_then(Value::as_str)
.unwrap_or("");
let detail = err
.and_then(|e| e.get("message"))
.and_then(Value::as_str)
.unwrap_or("");
let msg = match (code.is_empty(), detail.is_empty()) {
(false, false) => format!("rsclaw stream error [{code}]: {detail}"),
(false, true) => format!("rsclaw stream error [{code}]"),
(true, false) => format!("rsclaw stream error: {detail}"),
(true, true) => "rsclaw stream error".to_string(),
};
events.push(Ok(StreamEvent::Error(msg)));
}
"thinking" => {
if let Some(s) = val.get("content").and_then(Value::as_str)
&& !s.is_empty()
{
events.push(Ok(StreamEvent::ReasoningDelta(s.to_string())));
}
}
"tool_call" => {
let id = val
.get("id")
.and_then(Value::as_str)
.unwrap_or("")
.to_string();
let name = val
.get("name")
.and_then(Value::as_str)
.unwrap_or("")
.to_string();
let input = val
.get("input")
.cloned()
.filter(Value::is_object)
.unwrap_or(Value::Object(Default::default()));
events.push(Ok(StreamEvent::ToolCall { id, name, input }));
}
other => {
tracing::debug!(ty = other, payload, "rsclaw fastshot: unknown event type");
}
}
}
events
}
fn extract_images_for_oneshot(req: &LlmRequest) -> Vec<String> {
let mut images = Vec::new();
for msg in &req.messages {
if let MessageContent::Parts(parts) = &msg.content {
for p in parts {
if let ContentPart::Image { url } = p {
if !url.is_empty() {
images.push(url.clone());
}
}
}
}
}
images
}
fn history_for_replay(messages: &[Message]) -> &[Message] {
if messages.is_empty() {
return messages;
}
let last = &messages[messages.len() - 1];
if matches!(last.role, Role::Tool) {
let mut keep = messages.len();
while keep > 0 && matches!(messages[keep - 1].role, Role::Tool) {
keep -= 1;
}
&messages[..keep]
} else {
&messages[..messages.len() - 1]
}
}
fn split_system_messages(messages: &[Message]) -> (Vec<&Message>, String) {
let mut filtered: Vec<&Message> = Vec::with_capacity(messages.len());
let mut sys_parts: Vec<String> = Vec::new();
for m in messages {
if matches!(m.role, Role::System) {
let txt = match &m.content {
MessageContent::Text(t) => t.clone(),
MessageContent::Parts(parts) => {
let mut joined = String::new();
for p in parts {
if let ContentPart::Text { text } = p {
joined.push_str(text);
}
}
joined
}
};
if !txt.is_empty() {
sys_parts.push(txt);
}
} else {
filtered.push(m);
}
}
(filtered, sys_parts.join("\n\n"))
}
fn normalize_trailing_system(messages: &mut Vec<Message>) {
let mut trailing: Vec<String> = Vec::new();
while matches!(messages.last(), Some(m) if matches!(m.role, Role::System)) {
let m = messages.pop().expect("matched Some above");
let txt = match m.content {
MessageContent::Text(t) => t,
MessageContent::Parts(parts) => parts
.into_iter()
.filter_map(|p| match p {
ContentPart::Text { text } => Some(text),
_ => None,
})
.collect::<Vec<_>>()
.join(""),
};
if !txt.is_empty() {
trailing.push(txt);
}
}
if trailing.is_empty() {
return;
}
trailing.reverse();
let combined = trailing.join("\n\n");
match messages.last_mut() {
Some(last) if matches!(last.role, Role::User) => match &mut last.content {
MessageContent::Text(t) => {
if t.is_empty() {
*t = combined;
} else {
t.push_str("\n\n");
t.push_str(&combined);
}
}
MessageContent::Parts(parts) => {
parts.push(ContentPart::Text { text: combined });
}
},
_ => {
}
}
}
fn serialize_history_message(msg: &Message) -> Value {
let role = match msg.role {
Role::System => "system",
Role::User => "user",
Role::Assistant => "assistant",
Role::Tool => "user",
};
let content = match &msg.content {
MessageContent::Text(t) => json!(t),
MessageContent::Parts(parts) => {
let mapped: Vec<Value> = parts.iter().map(serialize_history_part).collect();
json!(mapped)
}
};
let mut out = json!({ "role": role, "content": content });
if let Some(hidden) = &msg.rsclaw_hidden
&& let Some(obj) = out.as_object_mut()
{
obj.insert(
"rsclaw_hidden".to_owned(),
serde_json::to_value(hidden).unwrap_or(Value::Null),
);
}
out
}
fn serialize_history_part(p: &ContentPart) -> Value {
match p {
ContentPart::Text { text } => json!({"type":"text","text":text}),
ContentPart::Image { url } => {
json!({"type":"image","source":{"type":"url","url":url}})
}
ContentPart::ToolUse { id, name, input } => {
json!({"type":"tool_use","id":id,"name":name,"input":input})
}
ContentPart::ToolResult {
tool_use_id,
content,
is_error,
} => {
let mut obj = json!({
"type":"tool_result",
"tool_use_id":tool_use_id,
"content":content,
});
if let Some(e) = is_error {
obj["is_error"] = json!(e);
}
obj
}
ContentPart::Reasoning { text } => json!({"type":"thinking","text":text}),
}
}
fn serialize_replay_history(messages: &[&Message]) -> Vec<Value> {
let mut out: Vec<Value> = Vec::with_capacity(messages.len());
let mut i = 0;
while i < messages.len() {
let m = messages[i];
if !matches!(m.role, Role::Tool) {
out.push(serialize_history_message(m));
i += 1;
continue;
}
let mut combined: Vec<Value> = Vec::new();
while i < messages.len() && matches!(messages[i].role, Role::Tool) {
match &messages[i].content {
MessageContent::Parts(parts) => {
for p in parts {
if matches!(p, ContentPart::ToolResult { .. }) {
combined.push(serialize_history_part(p));
}
}
}
MessageContent::Text(_) => {
tracing::warn!(
"rsclaw: dropping Role::Tool with text-only content during \
replay (no tool_use_id to pair with — runtime contract \
expects Parts(ToolResult{{..}}))",
);
debug_assert!(
false,
"Role::Tool must carry Parts(ToolResult{{..}}); got Text"
);
}
}
i += 1;
}
if !combined.is_empty() {
out.push(json!({ "role": "user", "content": combined }));
}
}
out
}
#[derive(Debug, Default)]
struct SseState {
blocks: std::collections::HashMap<u64, BlockBuilder>,
}
#[derive(Debug)]
struct BlockBuilder {
kind: BlockKind,
buf: String,
tool_id: String,
tool_name: String,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum BlockKind {
Text,
Thinking,
ToolCall,
}
async fn parse_sse_chunk(
chunk: Result<bytes::Bytes>,
line_buffer: &Arc<tokio::sync::Mutex<String>>,
utf8_remainder: &Arc<tokio::sync::Mutex<Vec<u8>>>,
block_state: &Arc<tokio::sync::Mutex<SseState>>,
) -> Vec<Result<StreamEvent>> {
let mut events: Vec<Result<StreamEvent>> = Vec::new();
let bytes = match chunk {
Ok(b) => b,
Err(e) => {
events.push(Err(e));
return events;
}
};
let mut remainder = utf8_remainder.lock().await;
let stitched: Vec<u8> = if remainder.is_empty() {
bytes.to_vec()
} else {
let mut combined = std::mem::take(&mut *remainder);
combined.extend_from_slice(&bytes);
combined
};
let decoded: String = match std::str::from_utf8(&stitched) {
Ok(s) => s.to_owned(),
Err(e) => {
let valid_up_to = e.valid_up_to();
let advance_past_invalid = e.error_len().unwrap_or(0);
*remainder = stitched[valid_up_to + advance_past_invalid..].to_vec();
if valid_up_to == 0 {
return events;
}
std::str::from_utf8(&stitched[..valid_up_to])
.expect("valid_up_to guarantees valid UTF-8")
.to_owned()
}
};
drop(remainder);
let mut buf = line_buffer.lock().await;
buf.push_str(&decoded);
while let Some(idx) = buf.find('\n') {
let line = buf[..idx].trim_end_matches('\r').to_string();
buf.drain(..=idx);
let Some(payload) = line
.strip_prefix("data:")
.map(|s| s.trim_start_matches(' '))
else {
continue;
};
if payload == "[DONE]" {
continue;
}
if payload.is_empty() {
continue;
}
let value: Value = match serde_json::from_str(payload) {
Ok(v) => v,
Err(e) => {
events.push(Err(anyhow::anyhow!(
"rsclaw SSE parse: {e}; line: {payload}"
)));
continue;
}
};
let kind = value.get("type").and_then(|v| v.as_str()).unwrap_or("");
let mut state = block_state.lock().await;
match kind {
"start" => {}
"ping" => {}
"block_start" => {
let Some(index) = value.get("index").and_then(Value::as_u64) else {
continue;
};
let block = value.get("block");
let block_type = block
.and_then(|b| b.get("type"))
.and_then(Value::as_str)
.unwrap_or("");
let kind = match block_type {
"text" => BlockKind::Text,
"thinking" => BlockKind::Thinking,
"tool_call" => BlockKind::ToolCall,
_ => continue,
};
let (tool_id, tool_name) = if kind == BlockKind::ToolCall {
(
block
.and_then(|b| b.get("id"))
.and_then(Value::as_str)
.unwrap_or("")
.to_string(),
block
.and_then(|b| b.get("name"))
.and_then(Value::as_str)
.unwrap_or("")
.to_string(),
)
} else {
(String::new(), String::new())
};
state.blocks.insert(
index,
BlockBuilder {
kind,
buf: String::new(),
tool_id,
tool_name,
},
);
}
"block_delta" => {
let Some(index) = value.get("index").and_then(Value::as_u64) else {
continue;
};
let Some(delta) = value.get("delta").and_then(Value::as_str) else {
continue;
};
let Some(b) = state.blocks.get_mut(&index) else {
continue;
};
match b.kind {
BlockKind::Text => {
if !delta.is_empty() {
events.push(Ok(StreamEvent::TextDelta(delta.to_string())));
}
}
BlockKind::Thinking => {
if !delta.is_empty() {
events.push(Ok(StreamEvent::ReasoningDelta(delta.to_string())));
}
}
BlockKind::ToolCall => {
b.buf.push_str(delta);
}
}
}
"block_stop" => {
let Some(index) = value.get("index").and_then(Value::as_u64) else {
continue;
};
if let Some(b) = state.blocks.remove(&index)
&& b.kind == BlockKind::ToolCall
{
let input: Value = if b.buf.is_empty() {
Value::Object(Default::default())
} else {
serde_json::from_str(&b.buf)
.unwrap_or_else(|_| Value::Object(Default::default()))
};
events.push(Ok(StreamEvent::ToolCall {
id: b.tool_id,
name: b.tool_name,
input,
}));
}
}
"done" => {
let usage = value
.get("usage")
.and_then(Value::as_object)
.map(|u| TokenUsage {
input: extract_usage_count(u, &["input_tokens", "prompt_tokens", "input"]),
output: extract_usage_count(
u,
&["output_tokens", "completion_tokens", "output"],
),
cache_creation: extract_usage_count(
u,
&["cache_creation_input_tokens", "cache_creation_tokens"],
),
cache_read: extract_usage_count(
u,
&[
"cache_read_input_tokens",
"cached_tokens",
"cache_read_tokens",
],
),
recall_tokens: extract_usage_count(u, &["recall_tokens"]),
recall_doc_ids: extract_usage_string_array(u, "recall_doc_ids"),
recall_hash: u
.get("recall_hash")
.and_then(Value::as_str)
.map(str::to_owned),
recall_truncated: u
.get("recall_truncated")
.and_then(Value::as_bool)
.unwrap_or(false),
});
events.push(Ok(StreamEvent::Done { usage }));
}
"error" => {
let err = value.get("error");
let code = err
.and_then(|e| e.get("code"))
.and_then(Value::as_str)
.unwrap_or("");
let detail = err
.and_then(|e| e.get("message"))
.and_then(Value::as_str)
.unwrap_or("");
let msg = match (code.is_empty(), detail.is_empty()) {
(false, false) => format!("rsclaw stream error [{code}]: {detail}"),
(false, true) => format!("rsclaw stream error [{code}]"),
(true, false) => format!("rsclaw stream error: {detail}"),
(true, true) => "rsclaw stream error".to_string(),
};
events.push(Ok(StreamEvent::Error(msg)));
}
_ => {}
}
drop(state);
}
events
}
fn extract_usage_count(u: &serde_json::Map<String, Value>, names: &[&str]) -> u64 {
for name in names {
if let Some(n) = u.get(*name).and_then(Value::as_u64) {
return n;
}
}
0
}
fn extract_usage_string_array(u: &serde_json::Map<String, Value>, name: &str) -> Vec<String> {
u.get(name)
.and_then(Value::as_array)
.map(|items| {
items
.iter()
.filter_map(|v| v.as_str().map(str::to_owned))
.collect()
})
.unwrap_or_default()
}
fn is_session_evicted(status: StatusCode, body: &str) -> bool {
let code = serde_json::from_str::<Value>(body)
.ok()
.as_ref()
.and_then(|v| v.get("error"))
.and_then(|e| e.get("code"))
.and_then(Value::as_str)
.map(str::to_owned);
match (status, code.as_deref()) {
(StatusCode::NOT_FOUND, Some("session_not_found")) => true,
(StatusCode::CONFLICT, Some("version_drift")) => true,
(StatusCode::SERVICE_UNAVAILABLE, Some("backend_unavailable")) => true,
_ => false,
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::ToolDef;
fn feed_sse(parser: &mut SseTerminalParser, raw: &str) -> Option<(String, String)> {
for line in raw.split('\n') {
if let Some(t) = parser.push_line(line.trim_end_matches('\r')) {
return Some(t);
}
}
None
}
#[test]
fn sse_parser_skips_heartbeats_and_returns_result() {
let mut p = SseTerminalParser::default();
let raw = ": replay-keepalive\n\n: replay-keepalive\n\nevent: result\ndata: {\"session_id\":\"rs_x\"}\n\n";
let (ev, data) = feed_sse(&mut p, raw).expect("terminal event");
assert_eq!(ev, "result");
assert_eq!(data, "{\"session_id\":\"rs_x\"}");
}
#[test]
fn sse_parser_returns_error_event_with_status() {
let mut p = SseTerminalParser::default();
let raw =
": hb\n\nevent: error\ndata: {\"status\":503,\"body\":{\"error\":\"no capacity\"}}\n\n";
let (ev, data) = feed_sse(&mut p, raw).expect("terminal event");
assert_eq!(ev, "error");
let v: Value = serde_json::from_str(&data).unwrap();
assert_eq!(v["status"], 503);
}
#[test]
fn sse_parser_joins_multi_line_data() {
let mut p = SseTerminalParser::default();
let raw = "event: result\ndata: line1\ndata: line2\n\n";
let (ev, data) = feed_sse(&mut p, raw).expect("terminal event");
assert_eq!(ev, "result");
assert_eq!(data, "line1\nline2");
}
#[test]
fn sse_parser_ignores_unknown_events() {
let mut p = SseTerminalParser::default();
assert!(feed_sse(&mut p, "event: progress\ndata: 42\n\n").is_none());
let (ev, _) = feed_sse(&mut p, "event: result\ndata: {}\n\n").expect("terminal");
assert_eq!(ev, "result");
}
async fn parse_sse_test(
chunk: Result<bytes::Bytes>,
buf: &Arc<tokio::sync::Mutex<String>>,
rem: &Arc<tokio::sync::Mutex<Vec<u8>>>,
state: &Arc<tokio::sync::Mutex<SseState>>,
) -> Vec<Result<StreamEvent>> {
parse_sse_chunk(chunk, buf, rem, state).await
}
fn new_state() -> Arc<tokio::sync::Mutex<SseState>> {
Arc::new(tokio::sync::Mutex::new(SseState::default()))
}
#[tokio::test]
async fn parse_sse_chunk_recovers_split_utf8() {
let buf = Arc::new(tokio::sync::Mutex::new(String::new()));
let rem = Arc::new(tokio::sync::Mutex::new(Vec::<u8>::new()));
let state = new_state();
let _ = parse_sse_test(
Ok(bytes::Bytes::from_static(
b"data: {\"type\":\"block_start\",\"index\":0,\"block\":{\"type\":\"text\"}}\n",
)),
&buf,
&rem,
&state,
)
.await;
let line_full = b"data: {\"type\":\"block_delta\",\"index\":0,\"delta\":\"\xe4\xbd\xa0\xe5\xa5\xbd\"}\n";
let split = 52;
let (a, b) = line_full.split_at(split);
let (b, c) = b.split_at(2);
for piece in [a, b, c] {
let _ =
parse_sse_test(Ok(bytes::Bytes::copy_from_slice(piece)), &buf, &rem, &state).await;
}
let evs = parse_sse_test(Ok(bytes::Bytes::from_static(b"")), &buf, &rem, &state).await;
let texts: Vec<_> = evs
.into_iter()
.filter_map(|e| match e {
Ok(StreamEvent::TextDelta(t)) => Some(t),
_ => None,
})
.collect();
let all_text: String = texts.into_iter().collect();
assert!(
!all_text.contains('\u{FFFD}'),
"expected no replacement char, got {all_text:?}"
);
}
#[tokio::test]
async fn parse_sse_chunk_advances_past_invalid_utf8_byte() {
let buf = Arc::new(tokio::sync::Mutex::new(String::new()));
let rem = Arc::new(tokio::sync::Mutex::new(Vec::<u8>::new()));
let state = new_state();
let evs = parse_sse_test(Ok(bytes::Bytes::from_static(b"\xff")), &buf, &rem, &state).await;
assert!(
evs.iter().all(|e| e.is_ok()),
"stray 0xFF must not surface as Err — got {evs:?}"
);
{
let r = rem.lock().await;
assert!(
!r.contains(&0xff),
"0xFF must be advanced past, not pinned in remainder; got {:?}",
*r
);
}
let evs = parse_sse_test(
Ok(bytes::Bytes::from_static(
b"data: {\"type\":\"block_start\",\"index\":0,\"block\":{\"type\":\"text\"}}\ndata: {\"type\":\"block_delta\",\"index\":0,\"delta\":\"hi\"}\ndata: {\"type\":\"block_stop\",\"index\":0}\n",
)),
&buf,
&rem,
&state,
)
.await;
let texts: Vec<_> = evs
.into_iter()
.filter_map(|e| match e {
Ok(StreamEvent::TextDelta(t)) => Some(t),
_ => None,
})
.collect();
assert_eq!(
texts,
vec!["hi".to_string()],
"stream must recover and emit subsequent events after a bad byte"
);
}
#[tokio::test]
async fn parse_sse_chunk_invalid_byte_does_not_unbounded_grow_remainder() {
let buf = Arc::new(tokio::sync::Mutex::new(String::new()));
let rem = Arc::new(tokio::sync::Mutex::new(Vec::<u8>::new()));
let state = new_state();
for _ in 0..50 {
let _ =
parse_sse_test(Ok(bytes::Bytes::from_static(b"\xff")), &buf, &rem, &state).await;
}
let r = rem.lock().await;
assert!(
r.len() <= 3,
"remainder must not accumulate invalid bytes (cap 3 for trailing incomplete UTF-8); got {} bytes",
r.len()
);
}
#[tokio::test]
async fn parse_sse_chunk_skips_empty_data_payload() {
let buf = Arc::new(tokio::sync::Mutex::new(String::new()));
let rem = Arc::new(tokio::sync::Mutex::new(Vec::<u8>::new()));
let state = new_state();
let line = b"data:\ndata: {\"type\":\"block_start\",\"index\":0,\"block\":{\"type\":\"text\"}}\ndata: {\"type\":\"block_delta\",\"index\":0,\"delta\":\"hi\"}\ndata: {\"type\":\"block_stop\",\"index\":0}\n";
let evs = parse_sse_test(Ok(bytes::Bytes::copy_from_slice(line)), &buf, &rem, &state).await;
let mut texts: Vec<String> = Vec::new();
for e in evs {
match e {
Ok(StreamEvent::TextDelta(t)) => texts.push(t),
Err(err) => panic!("empty data: must not surface as Err — got {err}"),
_ => {}
}
}
assert_eq!(texts, vec!["hi".to_string()]);
}
#[tokio::test]
async fn parse_sse_chunk_skips_data_with_only_spaces() {
let buf = Arc::new(tokio::sync::Mutex::new(String::new()));
let rem = Arc::new(tokio::sync::Mutex::new(Vec::<u8>::new()));
let state = new_state();
let line = b"data: \n";
let evs = parse_sse_test(Ok(bytes::Bytes::copy_from_slice(line)), &buf, &rem, &state).await;
for e in evs {
if let Err(err) = e {
panic!("whitespace-only data: must not surface as Err — got {err}");
}
}
}
#[tokio::test]
async fn parse_sse_chunk_accepts_data_without_leading_space() {
let buf = Arc::new(tokio::sync::Mutex::new(String::new()));
let rem = Arc::new(tokio::sync::Mutex::new(Vec::<u8>::new()));
let state = new_state();
let line = b"data:{\"type\":\"block_start\",\"index\":0,\"block\":{\"type\":\"text\"}}\ndata:{\"type\":\"block_delta\",\"index\":0,\"delta\":\"hi\"}\ndata:{\"type\":\"block_stop\",\"index\":0}\n";
let evs = parse_sse_test(Ok(bytes::Bytes::copy_from_slice(line)), &buf, &rem, &state).await;
let texts: Vec<_> = evs
.into_iter()
.filter_map(|e| match e {
Ok(StreamEvent::TextDelta(t)) => Some(t),
_ => None,
})
.collect();
assert_eq!(texts, vec!["hi".to_string()]);
}
#[tokio::test]
async fn parse_sse_chunk_native_delta_emits_text() {
let buf = Arc::new(tokio::sync::Mutex::new(String::new()));
let rem = Arc::new(tokio::sync::Mutex::new(Vec::<u8>::new()));
let state = new_state();
let line = b"data: {\"type\":\"block_start\",\"index\":0,\"block\":{\"type\":\"text\"}}\ndata: {\"type\":\"block_delta\",\"index\":0,\"delta\":\"hello\"}\ndata: {\"type\":\"block_stop\",\"index\":0}\n";
let evs = parse_sse_test(Ok(bytes::Bytes::copy_from_slice(line)), &buf, &rem, &state).await;
let texts: Vec<String> = evs
.into_iter()
.filter_map(|e| match e {
Ok(StreamEvent::TextDelta(s)) => Some(s),
_ => None,
})
.collect();
assert_eq!(texts, vec!["hello".to_string()]);
}
#[tokio::test]
async fn parse_sse_chunk_native_thinking_emits_reasoning() {
let buf = Arc::new(tokio::sync::Mutex::new(String::new()));
let rem = Arc::new(tokio::sync::Mutex::new(Vec::<u8>::new()));
let state = new_state();
let line = b"data: {\"type\":\"block_start\",\"index\":0,\"block\":{\"type\":\"thinking\"}}\ndata: {\"type\":\"block_delta\",\"index\":0,\"delta\":\"step 1: parse\"}\ndata: {\"type\":\"block_stop\",\"index\":0}\n";
let evs = parse_sse_test(Ok(bytes::Bytes::copy_from_slice(line)), &buf, &rem, &state).await;
let reasonings: Vec<String> = evs
.into_iter()
.filter_map(|e| match e {
Ok(StreamEvent::ReasoningDelta(s)) => Some(s),
_ => None,
})
.collect();
assert_eq!(reasonings, vec!["step 1: parse".to_string()]);
}
#[tokio::test]
async fn parse_sse_chunk_native_tool_call_emits_whole_frame() {
let buf = Arc::new(tokio::sync::Mutex::new(String::new()));
let rem = Arc::new(tokio::sync::Mutex::new(Vec::<u8>::new()));
let state = new_state();
let line = br#"data: {"type":"block_start","index":0,"block":{"type":"tool_call","id":"call_42","name":"read_file"}}
data: {"type":"block_delta","index":0,"delta":"{\"path\":\"x.rs\"}"}
data: {"type":"block_stop","index":0}
"#;
let evs = parse_sse_test(Ok(bytes::Bytes::copy_from_slice(line)), &buf, &rem, &state).await;
let (id, name, input) = evs
.into_iter()
.find_map(|e| match e {
Ok(StreamEvent::ToolCall { id, name, input }) => Some((id, name, input)),
_ => None,
})
.expect("expected one ToolCall event");
assert_eq!(id, "call_42");
assert_eq!(name, "read_file");
assert_eq!(input.get("path").and_then(Value::as_str), Some("x.rs"));
}
#[tokio::test]
async fn parse_sse_chunk_native_tool_call_missing_input_defaults_empty_object() {
let buf = Arc::new(tokio::sync::Mutex::new(String::new()));
let rem = Arc::new(tokio::sync::Mutex::new(Vec::<u8>::new()));
let state = new_state();
let line = b"data: {\"type\":\"block_start\",\"index\":0,\"block\":{\"type\":\"tool_call\",\"id\":\"c\",\"name\":\"get_time\"}}\ndata: {\"type\":\"block_stop\",\"index\":0}\n";
let evs = parse_sse_test(Ok(bytes::Bytes::copy_from_slice(line)), &buf, &rem, &state).await;
let input = evs
.into_iter()
.find_map(|e| match e {
Ok(StreamEvent::ToolCall { input, .. }) => Some(input),
_ => None,
})
.expect("expected one ToolCall event");
assert!(
input.as_object().is_some_and(|m| m.is_empty()),
"missing input must default to empty object, got {input:?}"
);
}
#[tokio::test]
async fn parse_sse_chunk_native_done_emits_done_with_usage() {
let buf = Arc::new(tokio::sync::Mutex::new(String::new()));
let rem = Arc::new(tokio::sync::Mutex::new(Vec::<u8>::new()));
let state = new_state();
let line = b"data: {\"type\":\"done\",\"finish_reason\":\"end_turn\",\"usage\":{\"input_tokens\":11,\"output_tokens\":22}}\n";
let evs = parse_sse_test(Ok(bytes::Bytes::copy_from_slice(line)), &buf, &rem, &state).await;
let mut saw_done = false;
for e in evs {
if let Ok(StreamEvent::Done { usage }) = e {
let u = usage.expect("usage should be populated");
assert_eq!(u.input, 11);
assert_eq!(u.output, 22);
saw_done = true;
}
}
assert!(saw_done, "expected Done from native done frame");
}
#[tokio::test]
async fn parse_sse_chunk_native_done_without_usage() {
let buf = Arc::new(tokio::sync::Mutex::new(String::new()));
let rem = Arc::new(tokio::sync::Mutex::new(Vec::<u8>::new()));
let state = new_state();
let line = b"data: {\"type\":\"done\",\"finish_reason\":\"end_turn\"}\n";
let evs = parse_sse_test(Ok(bytes::Bytes::copy_from_slice(line)), &buf, &rem, &state).await;
let mut saw_done = false;
for e in evs {
if let Ok(StreamEvent::Done { usage }) = e {
assert!(usage.is_none());
saw_done = true;
}
}
assert!(saw_done, "expected Done event even without usage");
}
#[tokio::test]
async fn parse_sse_chunk_native_done_usage_field_name_fallback() {
let buf = Arc::new(tokio::sync::Mutex::new(String::new()));
let rem = Arc::new(tokio::sync::Mutex::new(Vec::<u8>::new()));
let state = new_state();
let line =
b"data: {\"type\":\"done\",\"usage\":{\"prompt_tokens\":7,\"completion_tokens\":13}}\n";
let evs = parse_sse_test(Ok(bytes::Bytes::copy_from_slice(line)), &buf, &rem, &state).await;
let u = evs
.into_iter()
.find_map(|e| match e {
Ok(StreamEvent::Done { usage }) => usage,
_ => None,
})
.expect("expected Done with usage");
assert_eq!(u.input, 7);
assert_eq!(u.output, 13);
}
#[tokio::test]
async fn parse_sse_chunk_native_done_usage_includes_recall_accounting() {
let buf = Arc::new(tokio::sync::Mutex::new(String::new()));
let rem = Arc::new(tokio::sync::Mutex::new(Vec::<u8>::new()));
let state = new_state();
let line = b"data: {\"type\":\"done\",\"usage\":{\"input_tokens\":17,\"output_tokens\":5,\"recall_tokens\":3,\"recall_doc_ids\":[\"doc-1\",\"doc-2\"],\"recall_hash\":\"sha256:abc\",\"recall_truncated\":true}}\n";
let evs = parse_sse_test(Ok(bytes::Bytes::copy_from_slice(line)), &buf, &rem, &state).await;
let u = evs
.into_iter()
.find_map(|e| match e {
Ok(StreamEvent::Done { usage }) => usage,
_ => None,
})
.expect("expected Done with usage");
assert_eq!(u.input, 17);
assert_eq!(u.recall_tokens, 3);
assert_eq!(u.recall_doc_ids, vec!["doc-1", "doc-2"]);
assert_eq!(u.recall_hash.as_deref(), Some("sha256:abc"));
assert!(u.recall_truncated);
}
#[test]
fn serialize_history_message_preserves_rsclaw_hidden_recall() {
let msg = Message {
role: Role::User,
content: MessageContent::Text("我的手机号是什么?".into()),
rsclaw_hidden: Some(crate::RsclawHidden {
recall_context: "- 用户手机号: 13900001234".into(),
recall_format: "xml".into(),
recall_mode: "committed".into(),
recall_doc_ids: vec!["mem-1".into()],
recall_hash: "sha256:abc".into(),
recall_truncated: false,
recall_input_tokens: Some(12),
recall_trace_id: Some("recall_1".into()),
}),
};
let out = serialize_history_message(&msg);
assert_eq!(out["role"], "user");
assert_eq!(out["content"], "我的手机号是什么?");
assert_eq!(
out["rsclaw_hidden"]["recall_context"],
"- 用户手机号: 13900001234"
);
assert_eq!(out["rsclaw_hidden"]["recall_doc_ids"][0], "mem-1");
assert_eq!(out["rsclaw_hidden"]["recall_trace_id"], "recall_1");
}
#[tokio::test]
async fn parse_sse_chunk_native_done_partial_usage_keeps_present_side() {
let buf = Arc::new(tokio::sync::Mutex::new(String::new()));
let rem = Arc::new(tokio::sync::Mutex::new(Vec::<u8>::new()));
let state = new_state();
let line = b"data: {\"type\":\"done\",\"usage\":{\"input_tokens\":17}}\n";
let evs = parse_sse_test(Ok(bytes::Bytes::copy_from_slice(line)), &buf, &rem, &state).await;
let usage = evs
.into_iter()
.find_map(|e| match e {
Ok(StreamEvent::Done { usage }) => Some(usage),
_ => None,
})
.expect("expected one Done event")
.expect("usage should survive partial fields");
assert_eq!(usage.input, 17);
assert_eq!(usage.output, 0);
}
#[tokio::test]
async fn parse_sse_chunk_native_done_null_usage_is_none() {
let buf = Arc::new(tokio::sync::Mutex::new(String::new()));
let rem = Arc::new(tokio::sync::Mutex::new(Vec::<u8>::new()));
let state = new_state();
let line = b"data: {\"type\":\"done\",\"usage\":null}\n";
let evs = parse_sse_test(Ok(bytes::Bytes::copy_from_slice(line)), &buf, &rem, &state).await;
let mut saw_done = false;
for e in evs {
if let Ok(StreamEvent::Done { usage }) = e {
assert!(
usage.is_none(),
"null usage must collapse to None, got {usage:?}"
);
saw_done = true;
}
}
assert!(saw_done, "expected Done event with usage=None");
}
#[tokio::test]
async fn parse_sse_chunk_native_done_non_object_usage_is_none() {
let buf = Arc::new(tokio::sync::Mutex::new(String::new()));
let rem = Arc::new(tokio::sync::Mutex::new(Vec::<u8>::new()));
let state = new_state();
let line = b"data: {\"type\":\"done\",\"usage\":[1,2,3]}\n";
let evs = parse_sse_test(Ok(bytes::Bytes::copy_from_slice(line)), &buf, &rem, &state).await;
let mut saw_done = false;
for e in evs {
if let Ok(StreamEvent::Done { usage }) = e {
assert!(usage.is_none(), "non-object usage must collapse to None");
saw_done = true;
}
}
assert!(saw_done, "expected Done event");
}
#[tokio::test]
async fn parse_sse_chunk_native_error_preserves_code_and_message() {
let buf = Arc::new(tokio::sync::Mutex::new(String::new()));
let rem = Arc::new(tokio::sync::Mutex::new(Vec::<u8>::new()));
let state = new_state();
let line = br#"data: {"type":"error","error":{"code":"slot_evicted","message":"slot was reclaimed mid-decode"}}
"#;
let evs = parse_sse_test(Ok(bytes::Bytes::copy_from_slice(line)), &buf, &rem, &state).await;
let msg = evs
.into_iter()
.find_map(|e| match e {
Ok(StreamEvent::Error(m)) => Some(m),
_ => None,
})
.expect("expected one Error event");
assert!(msg.contains("slot_evicted"), "missing code: {msg}");
assert!(
msg.contains("slot was reclaimed mid-decode"),
"missing message: {msg}"
);
}
#[tokio::test]
async fn parse_sse_chunk_native_error_code_missing_keeps_message() {
let buf = Arc::new(tokio::sync::Mutex::new(String::new()));
let rem = Arc::new(tokio::sync::Mutex::new(Vec::<u8>::new()));
let state = new_state();
let line = br#"data: {"type":"error","error":{"message":"upstream hung up"}}
"#;
let evs = parse_sse_test(Ok(bytes::Bytes::copy_from_slice(line)), &buf, &rem, &state).await;
let msg = evs
.into_iter()
.find_map(|e| match e {
Ok(StreamEvent::Error(m)) => Some(m),
_ => None,
})
.expect("expected one Error event");
assert!(msg.contains("upstream hung up"), "missing message: {msg}");
assert!(!msg.contains("[]"), "empty-code marker leaked: {msg}");
}
#[tokio::test]
async fn parse_sse_chunk_native_error_message_missing_keeps_code() {
let buf = Arc::new(tokio::sync::Mutex::new(String::new()));
let rem = Arc::new(tokio::sync::Mutex::new(Vec::<u8>::new()));
let state = new_state();
let line = br#"data: {"type":"error","error":{"code":"version_drift"}}
"#;
let evs = parse_sse_test(Ok(bytes::Bytes::copy_from_slice(line)), &buf, &rem, &state).await;
let msg = evs
.into_iter()
.find_map(|e| match e {
Ok(StreamEvent::Error(m)) => Some(m),
_ => None,
})
.expect("expected one Error event");
assert!(msg.contains("version_drift"), "missing code: {msg}");
assert!(!msg.ends_with(": "), "trailing empty-message leaked: {msg}");
}
#[tokio::test]
async fn parse_sse_chunk_native_error_uses_default_when_both_missing() {
let buf = Arc::new(tokio::sync::Mutex::new(String::new()));
let rem = Arc::new(tokio::sync::Mutex::new(Vec::<u8>::new()));
let state = new_state();
let line = b"data: {\"type\":\"error\",\"error\":{}}\n";
let evs = parse_sse_test(Ok(bytes::Bytes::copy_from_slice(line)), &buf, &rem, &state).await;
let msg = evs
.into_iter()
.find_map(|e| match e {
Ok(StreamEvent::Error(m)) => Some(m),
_ => None,
})
.expect("expected one Error event");
assert_eq!(msg, "rsclaw stream error");
}
#[tokio::test]
async fn parse_sse_chunk_unknown_type_ignored_for_forward_compat() {
let buf = Arc::new(tokio::sync::Mutex::new(String::new()));
let rem = Arc::new(tokio::sync::Mutex::new(Vec::<u8>::new()));
let state = new_state();
let frames = br#"data: {"type":"cache_hit_summary","hits":42}
data: {"type":"block_start","index":0,"block":{"type":"text"}}
data: {"type":"block_delta","index":0,"delta":"hi"}
data: {"type":"block_stop","index":0}
"#;
let evs = parse_sse_test(
Ok(bytes::Bytes::copy_from_slice(frames)),
&buf,
&rem,
&state,
)
.await;
let texts: Vec<String> = evs
.into_iter()
.filter_map(|e| match e {
Ok(StreamEvent::TextDelta(s)) => Some(s),
_ => None,
})
.collect();
assert_eq!(texts, vec!["hi".to_string()]);
}
#[tokio::test]
async fn parse_v1_text_emits_incremental_text_deltas() {
let buf = Arc::new(tokio::sync::Mutex::new(String::new()));
let rem = Arc::new(tokio::sync::Mutex::new(Vec::<u8>::new()));
let state = new_state();
let frames = br#"data: {"type":"start"}
data: {"type":"block_start","index":0,"block":{"type":"text"}}
data: {"type":"block_delta","index":0,"delta":"Hel"}
data: {"type":"block_delta","index":0,"delta":"lo"}
data: {"type":"block_stop","index":0}
data: {"type":"done","finish_reason":"stop","usage":{"input_tokens":1,"output_tokens":2}}
data: [DONE]
"#;
let evs = parse_sse_test(
Ok(bytes::Bytes::copy_from_slice(frames)),
&buf,
&rem,
&state,
)
.await;
let mut texts = Vec::new();
let mut got_done = false;
for e in evs {
match e.unwrap() {
StreamEvent::TextDelta(s) => texts.push(s),
StreamEvent::Done { usage } => {
got_done = true;
let u = usage.expect("usage present");
assert_eq!(u.input, 1);
assert_eq!(u.output, 2);
}
other => panic!("unexpected event {other:?}"),
}
}
assert_eq!(texts, vec!["Hel".to_string(), "lo".to_string()]);
assert!(got_done, "Done event missing");
}
#[tokio::test]
async fn parse_v1_tool_call_streams_args_then_emits_one_toolcall() {
let buf = Arc::new(tokio::sync::Mutex::new(String::new()));
let rem = Arc::new(tokio::sync::Mutex::new(Vec::<u8>::new()));
let state = new_state();
let frames = br#"data: {"type":"block_start","index":0,"block":{"type":"tool_call","id":"c1","name":"write_file"}}
data: {"type":"block_delta","index":0,"delta":"{\"path\":\"a.txt\","}
data: {"type":"block_delta","index":0,"delta":"\"content\":\"hello\"}"}
data: {"type":"block_stop","index":0}
data: {"type":"done"}
data: [DONE]
"#;
let evs = parse_sse_test(
Ok(bytes::Bytes::copy_from_slice(frames)),
&buf,
&rem,
&state,
)
.await;
let mut tool_calls = Vec::new();
for e in &evs {
if let Ok(StreamEvent::TextDelta(_)) = e {
panic!("tool_call deltas must NOT emit TextDelta");
}
if let Ok(StreamEvent::ToolCall { id, name, input }) = e {
tool_calls.push((id.clone(), name.clone(), input.clone()));
}
}
assert_eq!(tool_calls.len(), 1, "expected exactly one ToolCall");
let (id, name, input) = tool_calls.into_iter().next().unwrap();
assert_eq!(id, "c1");
assert_eq!(name, "write_file");
assert_eq!(input.get("path").and_then(Value::as_str), Some("a.txt"));
assert_eq!(input.get("content").and_then(Value::as_str), Some("hello"));
}
#[tokio::test]
async fn parse_v1_parallel_blocks_by_index() {
let buf = Arc::new(tokio::sync::Mutex::new(String::new()));
let rem = Arc::new(tokio::sync::Mutex::new(Vec::<u8>::new()));
let state = new_state();
let frames = br#"data: {"type":"block_start","index":0,"block":{"type":"text"}}
data: {"type":"block_start","index":1,"block":{"type":"tool_call","id":"t1","name":"shell"}}
data: {"type":"block_delta","index":0,"delta":"Running... "}
data: {"type":"block_delta","index":1,"delta":"{\"cmd\":\""}
data: {"type":"block_delta","index":0,"delta":"please wait"}
data: {"type":"block_delta","index":1,"delta":"ls\"}"}
data: {"type":"block_stop","index":1}
data: {"type":"block_stop","index":0}
"#;
let evs = parse_sse_test(
Ok(bytes::Bytes::copy_from_slice(frames)),
&buf,
&rem,
&state,
)
.await;
let mut texts = Vec::new();
let mut tool_calls = Vec::new();
for e in evs {
match e.unwrap() {
StreamEvent::TextDelta(s) => texts.push(s),
StreamEvent::ToolCall { id, name, input } => tool_calls.push((id, name, input)),
_ => {}
}
}
assert_eq!(
texts,
vec!["Running... ".to_string(), "please wait".to_string()]
);
assert_eq!(tool_calls.len(), 1);
let (id, name, input) = tool_calls.into_iter().next().unwrap();
assert_eq!(id, "t1");
assert_eq!(name, "shell");
assert_eq!(input.get("cmd").and_then(Value::as_str), Some("ls"));
}
#[tokio::test]
#[allow(clippy::never_loop)] async fn parse_v1_start_and_ping_emit_nothing() {
let buf = Arc::new(tokio::sync::Mutex::new(String::new()));
let rem = Arc::new(tokio::sync::Mutex::new(Vec::<u8>::new()));
let state = new_state();
let frames = br#"data: {"type":"start","model":"foo","session_id":"s1"}
data: {"type":"ping"}
data: {"type":"ping"}
"#;
let evs = parse_sse_test(
Ok(bytes::Bytes::copy_from_slice(frames)),
&buf,
&rem,
&state,
)
.await;
for e in evs {
match e {
Ok(ev) => panic!("start/ping must not emit events; got {ev:?}"),
Err(err) => panic!("start/ping must not surface as Err; got {err}"),
}
}
}
#[tokio::test]
async fn parse_v1_done_sentinel_is_silently_consumed() {
let buf = Arc::new(tokio::sync::Mutex::new(String::new()));
let rem = Arc::new(tokio::sync::Mutex::new(Vec::<u8>::new()));
let state = new_state();
let frames = br#"data: {"type":"done"}
data: [DONE]
"#;
let evs = parse_sse_test(
Ok(bytes::Bytes::copy_from_slice(frames)),
&buf,
&rem,
&state,
)
.await;
let dones: Vec<_> = evs
.iter()
.filter_map(|e| match e {
Ok(StreamEvent::Done { .. }) => Some(()),
_ => None,
})
.collect();
assert_eq!(dones.len(), 1, "expected exactly one Done event");
for e in &evs {
if let Err(err) = e {
panic!("[DONE] sentinel must not surface as Err; got {err}");
}
}
}
#[tokio::test]
#[allow(clippy::never_loop)] async fn parse_v1_block_delta_for_unopened_index_dropped_silently() {
let buf = Arc::new(tokio::sync::Mutex::new(String::new()));
let rem = Arc::new(tokio::sync::Mutex::new(Vec::<u8>::new()));
let state = new_state();
let frames = br#"data: {"type":"block_delta","index":99,"delta":"orphan"}
data: {"type":"block_stop","index":99}
"#;
let evs = parse_sse_test(
Ok(bytes::Bytes::copy_from_slice(frames)),
&buf,
&rem,
&state,
)
.await;
for e in evs {
match e {
Ok(ev) => panic!("orphan delta must not emit; got {ev:?}"),
Err(err) => panic!("orphan delta must not surface as Err; got {err}"),
}
}
}
#[tokio::test]
async fn parse_v1_done_extracts_cache_stats() {
let buf = Arc::new(tokio::sync::Mutex::new(String::new()));
let rem = Arc::new(tokio::sync::Mutex::new(Vec::<u8>::new()));
let state = new_state();
let line = b"data: {\"type\":\"done\",\"usage\":{\"input_tokens\":120,\"output_tokens\":40,\"cache_creation_input_tokens\":50,\"cache_read_input_tokens\":70}}\n";
let evs = parse_sse_test(Ok(bytes::Bytes::copy_from_slice(line)), &buf, &rem, &state).await;
let usage = evs
.into_iter()
.find_map(|e| match e {
Ok(StreamEvent::Done { usage }) => usage,
_ => None,
})
.expect("Done with usage");
assert_eq!(usage.input, 120);
assert_eq!(usage.output, 40);
assert_eq!(usage.cache_creation, 50);
assert_eq!(usage.cache_read, 70);
}
#[tokio::test]
async fn parse_v1_done_cached_tokens_alias_maps_to_cache_read() {
let buf = Arc::new(tokio::sync::Mutex::new(String::new()));
let rem = Arc::new(tokio::sync::Mutex::new(Vec::<u8>::new()));
let state = new_state();
let line = b"data: {\"type\":\"done\",\"usage\":{\"input_tokens\":80,\"output_tokens\":20,\"cached_tokens\":60}}\n";
let evs = parse_sse_test(Ok(bytes::Bytes::copy_from_slice(line)), &buf, &rem, &state).await;
let usage = evs
.into_iter()
.find_map(|e| match e {
Ok(StreamEvent::Done { usage }) => usage,
_ => None,
})
.expect("Done with usage");
assert_eq!(usage.cache_creation, 0);
assert_eq!(usage.cache_read, 60);
}
#[tokio::test]
async fn parse_v1_done_without_cache_fields_defaults_to_zero() {
let buf = Arc::new(tokio::sync::Mutex::new(String::new()));
let rem = Arc::new(tokio::sync::Mutex::new(Vec::<u8>::new()));
let state = new_state();
let line =
b"data: {\"type\":\"done\",\"usage\":{\"input_tokens\":10,\"output_tokens\":5}}\n";
let evs = parse_sse_test(Ok(bytes::Bytes::copy_from_slice(line)), &buf, &rem, &state).await;
let usage = evs
.into_iter()
.find_map(|e| match e {
Ok(StreamEvent::Done { usage }) => usage,
_ => None,
})
.expect("Done with usage");
assert_eq!(usage.input, 10);
assert_eq!(usage.output, 5);
assert_eq!(usage.cache_creation, 0);
assert_eq!(usage.cache_read, 0);
}
#[tokio::test]
async fn parse_v1_tool_call_malformed_json_falls_back_to_empty_object() {
let buf = Arc::new(tokio::sync::Mutex::new(String::new()));
let rem = Arc::new(tokio::sync::Mutex::new(Vec::<u8>::new()));
let state = new_state();
let frames = br#"data: {"type":"block_start","index":0,"block":{"type":"tool_call","id":"c","name":"foo"}}
data: {"type":"block_delta","index":0,"delta":"{not valid json"}
data: {"type":"block_stop","index":0}
"#;
let evs = parse_sse_test(
Ok(bytes::Bytes::copy_from_slice(frames)),
&buf,
&rem,
&state,
)
.await;
let input = evs
.into_iter()
.find_map(|e| match e {
Ok(StreamEvent::ToolCall { input, .. }) => Some(input),
_ => None,
})
.expect("expected one ToolCall event");
assert!(input.is_object());
assert_eq!(input.as_object().unwrap().len(), 0);
}
#[test]
fn turn_headers_timeout_is_bounded_and_finite() {
let s = TURN_HEADERS_TIMEOUT.as_secs();
assert!(
(30..=120).contains(&s),
"TURN_HEADERS_TIMEOUT={s}s out of range"
);
}
#[test]
fn ctor_trims_whitespace_from_base_url_and_bearer() {
let p = RsclawProvider::new(" http://x:8090/v1/agent/ ", Some(" sk-abc\n ".into()));
assert_eq!(p.base_url, "http://x:8090/v1/agent");
assert_eq!(p.bearer.as_deref(), Some("sk-abc"));
}
#[test]
fn ctor_blank_or_empty_bearer_becomes_none() {
assert!(RsclawProvider::new("http://x", None).bearer.is_none());
assert!(
RsclawProvider::new("http://x", Some(String::new()))
.bearer
.is_none()
);
assert!(
RsclawProvider::new("http://x", Some(" \n\t".into()))
.bearer
.is_none()
);
}
#[test]
fn is_session_evicted_recognizes_session_not_found() {
let body = r#"{"error":{"code":"session_not_found","detail":"slot evicted"}}"#;
assert!(is_session_evicted(StatusCode::NOT_FOUND, body));
}
#[test]
fn is_session_evicted_rejects_404_with_other_code() {
let body = r#"{"error":{"code":"unknown_version","detail":"v not registered"}}"#;
assert!(!is_session_evicted(StatusCode::NOT_FOUND, body));
assert!(!is_session_evicted(StatusCode::NOT_FOUND, ""));
assert!(!is_session_evicted(
StatusCode::NOT_FOUND,
"<html>not found</html>",
));
}
#[test]
fn is_session_evicted_recognizes_version_drift() {
let body = r#"{"error":{"code":"version_drift","detail":"node has been upgraded"}}"#;
assert!(is_session_evicted(StatusCode::CONFLICT, body));
}
#[test]
fn is_session_evicted_recognizes_backend_unavailable() {
let body = r#"{"error":{"code":"backend_unavailable","detail":"heartbeat timeout"}}"#;
assert!(is_session_evicted(StatusCode::SERVICE_UNAVAILABLE, body));
}
#[test]
fn is_session_evicted_excludes_no_backend_available() {
let body = r#"{"error":{"code":"no_backend_available","detail":"all GPUs saturated"}}"#;
assert!(!is_session_evicted(StatusCode::SERVICE_UNAVAILABLE, body));
}
#[test]
fn is_session_evicted_rejects_status_code_mismatch() {
let body = r#"{"error":{"code":"version_drift","detail":"x"}}"#;
assert!(!is_session_evicted(StatusCode::SERVICE_UNAVAILABLE, body));
let body = r#"{"error":{"code":"backend_unavailable","detail":"x"}}"#;
assert!(!is_session_evicted(StatusCode::CONFLICT, body));
}
#[test]
fn is_session_evicted_rejects_malformed_body() {
assert!(!is_session_evicted(StatusCode::CONFLICT, ""));
assert!(!is_session_evicted(StatusCode::CONFLICT, "not json"));
assert!(!is_session_evicted(
StatusCode::CONFLICT,
r#"{"code":"version_drift"}"#,
));
}
fn req_with(messages: Vec<Message>, mode: u8, key: Option<&str>) -> LlmRequest {
LlmRequest {
fallback_models: Vec::new(),
model: "2026.5.15".into(),
messages,
system: Some("you are an agent".into()),
kv_cache_mode: mode,
session_key: key.map(str::to_string),
..Default::default()
}
}
#[test]
fn canonical_value_sorts_object_keys_alphabetically() {
let input = json!({
"z_last": 1,
"a_first": 2,
"m_mid": 3,
});
let canon = to_canonical_value(input);
let serialized = serde_json::to_string(&canon).unwrap();
assert_eq!(serialized, r#"{"a_first":2,"m_mid":3,"z_last":1}"#);
}
#[test]
fn canonical_value_recurses_into_nested_objects() {
let input = json!({
"outer_b": {"y": 1, "x": 2},
"outer_a": {"inner": {"z": 0, "a": 1}},
});
let canon = to_canonical_value(input);
let s = serde_json::to_string(&canon).unwrap();
assert_eq!(
s,
r#"{"outer_a":{"inner":{"a":1,"z":0}},"outer_b":{"x":2,"y":1}}"#
);
}
#[test]
fn canonical_value_preserves_array_order() {
let input = json!([3, 1, 2, {"b": 1, "a": 2}]);
let canon = to_canonical_value(input);
let s = serde_json::to_string(&canon).unwrap();
assert_eq!(s, r#"[3,1,2,{"a":2,"b":1}]"#);
}
#[test]
fn canonical_value_is_idempotent() {
let input = json!({
"tools": [{
"name": "search",
"input_schema": {
"type": "object",
"properties": {"q": {"type": "string"}, "k": {"type": "integer"}},
"required": ["q"]
}
}]
});
let once = to_canonical_value(input.clone());
let twice = to_canonical_value(once.clone());
assert_eq!(
serde_json::to_string(&once).unwrap(),
serde_json::to_string(&twice).unwrap()
);
}
#[test]
fn canonical_value_byte_stable_across_input_orderings() {
let a = json!({
"z": [{"b": 1, "a": 2}],
"a": {"inner_z": 1, "inner_a": 2},
});
let b = json!({
"a": {"inner_a": 2, "inner_z": 1},
"z": [{"a": 2, "b": 1}],
});
let canon_a = to_canonical_value(a);
let canon_b = to_canonical_value(b);
assert_eq!(
serde_json::to_string(&canon_a).unwrap(),
serde_json::to_string(&canon_b).unwrap()
);
}
#[test]
fn split_request_dynamic_tools_are_canonical() {
let mut req = req_with(vec![], 2, Some("k"));
req.tools = vec![crate::ToolDef {
name: "search".into(), description: "look stuff up".into(),
parameters: json!({
"type": "object",
"required": ["q"],
"properties": {
"q": {"type": "string"},
"k": {"type": "integer"},
}
}),
}];
let split = split_request(&req, RSCLAW_DEFAULT_PREFIX_ID, false).unwrap();
assert_eq!(
split.dynamic_tools.len(),
0,
"non-builtin 'search' must NOT land in the base tools array"
);
assert_eq!(split.dynamic_user_tools.len(), 1);
let serialized = serde_json::to_string(&split.dynamic_user_tools[0]).unwrap();
assert_eq!(
serialized,
concat!(
r#"{"description":"look stuff up","input_schema":"#,
r#"{"properties":{"k":{"type":"integer"},"q":{"type":"string"}},"#,
r#""required":["q"],"type":"object"},"name":"search"}"#
)
);
}
#[test]
fn split_request_uses_provided_prefix_id_verbatim() {
let mut req = req_with(vec![], 2, Some("k"));
req.model = "qwen3-235b".into();
let split = split_request(&req, RSCLAW_DEFAULT_PREFIX_ID, false).unwrap();
assert_eq!(split.prefix_id, RSCLAW_DEFAULT_PREFIX_ID);
req.model = "myorg/qwen3-235b".into();
let split2 = split_request(&req, RSCLAW_DEFAULT_PREFIX_ID, false).unwrap();
assert_eq!(split2.prefix_id, RSCLAW_DEFAULT_PREFIX_ID);
}
#[test]
fn split_request_honours_custom_prefix_id_override() {
let mut req = req_with(vec![], 2, Some("k"));
req.model = "qwen3-235b".into();
let split = split_request(&req, "myorg/2026.5.15", false).unwrap();
assert_eq!(split.prefix_id, "myorg/2026.5.15");
}
#[test]
fn with_prefix_id_overrides_default_and_ignores_blank() {
let p = RsclawProvider::new("http://x", None);
assert_eq!(p.prefix_id, RSCLAW_DEFAULT_PREFIX_ID);
let p = RsclawProvider::new("http://x", None).with_prefix_id("tenant/2026.6.1");
assert_eq!(p.prefix_id, "tenant/2026.6.1");
let p = RsclawProvider::new("http://x", None).with_prefix_id(" \n ");
assert_eq!(p.prefix_id, RSCLAW_DEFAULT_PREFIX_ID);
}
#[test]
fn with_prefix_id_rejects_invalid_slash_count() {
let default = RSCLAW_DEFAULT_PREFIX_ID;
let p = RsclawProvider::new("http://x", None).with_prefix_id("rsclaw-2026.5.15");
assert_eq!(p.prefix_id, default, "no slash → reject");
let p = RsclawProvider::new("http://x", None).with_prefix_id("foo/bar/baz");
assert_eq!(p.prefix_id, default, "two slashes → reject");
let p = RsclawProvider::new("http://x", None).with_prefix_id(" tenant/v1\n");
assert_eq!(p.prefix_id, "tenant/v1", "trim before count");
}
#[test]
fn split_request_separates_builtin_and_user_tools_when_split_present() {
let mut req = req_with(vec![], 2, Some("k"));
req.system_shared = Some("<shared system>".into());
req.user_system = Some("<user suffix>".into());
req.tools.push(ToolDef {
name: "search".into(), description: "search the web".into(),
parameters: json!({"type":"object","properties":{}}),
});
req.tools.push(ToolDef {
name: "memory".into(), description: "memory tool".into(),
parameters: json!({"type":"object","properties":{}}),
});
let split = split_request(&req, RSCLAW_DEFAULT_PREFIX_ID, false).unwrap();
assert_eq!(split.dynamic_tools.len(), 1, "only 'memory' is builtin");
assert_eq!(split.dynamic_tools[0]["name"], "memory");
assert_eq!(split.dynamic_user_tools.len(), 1, "'search' is per-client");
assert_eq!(split.dynamic_user_tools[0]["name"], "search");
assert_eq!(split.dynamic_system, "<shared system>");
assert_eq!(split.dynamic_user_system, "<user suffix>");
}
#[test]
fn split_request_classifies_tools_in_degraded_mode_too() {
let mut req = req_with(vec![], 2, Some("k"));
req.tools.push(ToolDef {
name: "search".into(), description: "search".into(),
parameters: json!({"type":"object"}),
});
req.tools.push(ToolDef {
name: "memory".into(), description: "memory".into(),
parameters: json!({"type":"object"}),
});
let split = split_request(&req, RSCLAW_DEFAULT_PREFIX_ID, false).unwrap();
assert_eq!(split.dynamic_tools.len(), 1);
assert_eq!(split.dynamic_tools[0]["name"], "memory");
assert_eq!(split.dynamic_user_tools.len(), 1);
assert_eq!(split.dynamic_user_tools[0]["name"], "search");
assert_eq!(split.dynamic_system, "you are an agent");
assert_eq!(split.dynamic_user_system, "");
}
#[test]
fn create_session_req_with_prefix_id_omits_dynamic_prefix() {
let mut req = req_with(vec![], 2, Some("k"));
req.system_shared = Some("<sys>".into());
req.user_system = Some("<suf>".into());
req.tools.push(ToolDef {
name: "memory".into(),
description: "memory tool".into(),
parameters: json!({"type":"object"}),
});
let split = split_request(&req, RSCLAW_DEFAULT_PREFIX_ID, false).unwrap();
let (prefix_id, dynamic_prefix, top_level_user_tools) = prefix_fields(
&split.prefix_id,
DynamicPrefixWire {
system: split.dynamic_system,
tools: &split.dynamic_tools,
user_tools: &split.dynamic_user_tools,
user_system: split.dynamic_user_system,
},
);
let body = CreateSessionReq {
prefix_id,
model: &split.model,
dynamic_prefix,
user_tools: top_level_user_tools,
options: Some(split.options.clone()),
};
let v = serde_json::to_value(&body).unwrap();
assert_eq!(v["prefix_id"], RSCLAW_DEFAULT_PREFIX_ID);
assert!(
v.get("dynamic_prefix").is_none(),
"non-empty prefix_id must OMIT dynamic_prefix (mutually exclusive)"
);
assert!(
v.get("user_tools").is_none(),
"no per-session private tools → top-level user_tools omitted"
);
assert!(
v.get("rsclaw_version").is_none(),
"rsclaw_version is the pre-rename name; never send"
);
assert!(
v.get("user_suffix").is_none(),
"user_suffix is the legacy name; never send (top-level or otherwise)"
);
assert!(
v.get("user_system").is_none(),
"user_system lives inside dynamic_prefix, never at top-level"
);
assert!(
v.get("plugins_system").is_none(),
"pre-rename field; folded into dynamic_prefix.system"
);
assert!(
v.get("skills_system").is_none(),
"pre-rename field; folded into dynamic_prefix.system"
);
}
#[test]
fn create_session_req_registry_path_sends_top_level_user_tools() {
let mut req = req_with(vec![], 2, Some("k"));
req.system_shared = Some("<sys>".into());
req.tools.push(ToolDef {
name: "memory".into(),
description: "memory".into(),
parameters: json!({"type":"object"}),
});
req.tools.push(ToolDef {
name: "douyin__publish".into(), description: "publish to douyin".into(),
parameters: json!({"type":"object"}),
});
let split = split_request(&req, RSCLAW_DEFAULT_PREFIX_ID, false).unwrap();
let (prefix_id, dynamic_prefix, top_level_user_tools) = prefix_fields(
&split.prefix_id,
DynamicPrefixWire {
system: split.dynamic_system,
tools: &split.dynamic_tools,
user_tools: &split.dynamic_user_tools,
user_system: split.dynamic_user_system,
},
);
let body = CreateSessionReq {
prefix_id,
model: &split.model,
dynamic_prefix,
user_tools: top_level_user_tools,
options: Some(split.options.clone()),
};
let v = serde_json::to_value(&body).unwrap();
assert_eq!(v["prefix_id"], RSCLAW_DEFAULT_PREFIX_ID);
assert!(
v.get("dynamic_prefix").is_none(),
"registry path must omit dynamic_prefix"
);
let tools = v["user_tools"]
.as_array()
.expect("top-level user_tools must be present");
assert_eq!(tools.len(), 1);
assert_eq!(tools[0]["name"], "douyin__publish");
}
#[test]
fn create_session_req_dynamic_path_keeps_user_tools_inside_dynamic_prefix() {
let mut req = req_with(vec![], 2, Some("k"));
req.system_shared = Some("<sys>".into());
req.tools.push(ToolDef {
name: "memory".into(),
description: "memory".into(),
parameters: json!({"type":"object"}),
});
req.tools.push(ToolDef {
name: "douyin__publish".into(),
description: "publish to douyin".into(),
parameters: json!({"type":"object"}),
});
let split = split_request(&req, "", false).unwrap();
let (prefix_id, dynamic_prefix, top_level_user_tools) = prefix_fields(
&split.prefix_id,
DynamicPrefixWire {
system: split.dynamic_system,
tools: &split.dynamic_tools,
user_tools: &split.dynamic_user_tools,
user_system: split.dynamic_user_system,
},
);
let body = CreateSessionReq {
prefix_id,
model: &split.model,
dynamic_prefix,
user_tools: top_level_user_tools,
options: Some(split.options.clone()),
};
let v = serde_json::to_value(&body).unwrap();
assert!(
v.get("prefix_id").is_none(),
"dynamic path omits top-level prefix_id"
);
assert!(
v.get("user_tools").is_none(),
"dynamic path keeps user_tools inside dynamic_prefix"
);
let dyn_user_tools = v["dynamic_prefix"]["user_tools"]
.as_array()
.expect("dynamic_prefix.user_tools must be present");
assert_eq!(dyn_user_tools.len(), 1);
assert_eq!(dyn_user_tools[0]["name"], "douyin__publish");
let dyn_tools = v["dynamic_prefix"]["tools"]
.as_array()
.expect("dynamic_prefix.tools must be present");
assert_eq!(dyn_tools.len(), 1);
assert_eq!(
dyn_tools[0]["name"], "memory",
"builtins stay in dynamic_prefix.tools (base hash)"
);
}
#[test]
fn create_session_req_empty_prefix_id_sends_dynamic_prefix() {
let mut req = req_with(vec![], 2, Some("k"));
req.system_shared = Some("<sys>".into());
req.user_system = Some("<suf>".into());
req.tools.push(ToolDef {
name: "memory".into(),
description: "memory tool".into(),
parameters: json!({"type":"object"}),
});
let split = split_request(&req, "", false).unwrap();
let (prefix_id, dynamic_prefix, top_level_user_tools) = prefix_fields(
&split.prefix_id,
DynamicPrefixWire {
system: split.dynamic_system,
tools: &split.dynamic_tools,
user_tools: &split.dynamic_user_tools,
user_system: split.dynamic_user_system,
},
);
let body = CreateSessionReq {
prefix_id,
model: &split.model,
dynamic_prefix,
user_tools: top_level_user_tools,
options: Some(split.options.clone()),
};
let v = serde_json::to_value(&body).unwrap();
assert!(
v.get("prefix_id").is_none(),
"empty prefix_id must be OMITTED (mutually exclusive with dynamic_prefix)"
);
assert_eq!(v["dynamic_prefix"]["system"], "<sys>");
assert_eq!(v["dynamic_prefix"]["user_system"], "<suf>");
assert_eq!(v["dynamic_prefix"]["tools"][0]["name"], "memory");
}
#[test]
fn history_for_replay_drops_trailing_delta() {
let m = |role, txt: &str| Message {
role,
content: MessageContent::Text(txt.into()),
rsclaw_hidden: None,
};
let msgs = vec![
m(Role::User, "hi"),
m(Role::Assistant, "yo"),
m(Role::User, "again"),
];
let slice = history_for_replay(&msgs);
assert_eq!(slice.len(), 2);
assert!(matches!(slice[0].role, Role::User));
assert!(matches!(slice[1].role, Role::Assistant));
}
#[test]
fn history_for_replay_handles_empty_and_singleton() {
let empty: Vec<Message> = Vec::new();
assert!(history_for_replay(&empty).is_empty());
let one = vec![Message {
role: Role::User,
content: MessageContent::Text("solo".into()),
rsclaw_hidden: None,
}];
assert!(history_for_replay(&one).is_empty());
}
#[test]
fn history_for_replay_drops_all_consecutive_trailing_tools() {
let m = |role, txt: &str| Message {
role,
content: MessageContent::Text(txt.into()),
rsclaw_hidden: None,
};
let tool = |id: &str| Message {
role: Role::Tool,
content: MessageContent::Parts(vec![ContentPart::ToolResult {
tool_use_id: id.into(),
content: "ok".into(),
is_error: None,
}]),
rsclaw_hidden: None,
};
let msgs = vec![
m(Role::User, "do all three"),
m(Role::Assistant, "calling tools"),
tool("toolu_1"),
tool("toolu_2"),
tool("toolu_3"),
];
let slice = history_for_replay(&msgs);
assert_eq!(slice.len(), 2);
assert!(matches!(slice[0].role, Role::User));
assert!(matches!(slice[1].role, Role::Assistant));
}
#[test]
fn history_for_replay_keeps_earlier_tool_messages() {
let m = |role, txt: &str| Message {
role,
content: MessageContent::Text(txt.into()),
rsclaw_hidden: None,
};
let tool = |id: &str| Message {
role: Role::Tool,
content: MessageContent::Parts(vec![ContentPart::ToolResult {
tool_use_id: id.into(),
content: "ok".into(),
is_error: None,
}]),
rsclaw_hidden: None,
};
let msgs = vec![
m(Role::User, "go"),
m(Role::Assistant, "step1"),
tool("a"),
m(Role::Assistant, "step2"),
tool("b"),
];
let slice = history_for_replay(&msgs);
assert_eq!(slice.len(), 4);
assert!(matches!(slice[2].role, Role::Tool));
assert!(matches!(slice[3].role, Role::Assistant));
}
#[test]
fn serialize_replay_history_coalesces_parallel_tools() {
let mk_tool = |id: &str, body: &str| Message {
role: Role::Tool,
content: MessageContent::Parts(vec![ContentPart::ToolResult {
tool_use_id: id.into(),
content: body.into(),
is_error: None,
}]),
rsclaw_hidden: None,
};
let user = Message {
role: Role::User,
content: MessageContent::Text("go".into()),
rsclaw_hidden: None,
};
let asst = Message {
role: Role::Assistant,
content: MessageContent::Text("calling tools".into()),
rsclaw_hidden: None,
};
let ta = mk_tool("a", "ra");
let tb = mk_tool("b", "rb");
let tc = mk_tool("c", "rc");
let msgs = vec![&user, &asst, &ta, &tb, &tc];
let out = serialize_replay_history(&msgs);
assert_eq!(
out.len(),
3,
"user + assistant + 1 coalesced tool entry: {out:?}"
);
assert_eq!(out[0]["role"], "user");
assert_eq!(out[1]["role"], "assistant");
assert_eq!(out[2]["role"], "user");
let parts = out[2]["content"].as_array().expect("content array");
assert_eq!(parts.len(), 3);
assert_eq!(parts[0]["tool_use_id"], "a");
assert_eq!(parts[1]["tool_use_id"], "b");
assert_eq!(parts[2]["tool_use_id"], "c");
for p in parts {
assert_eq!(p["type"], "tool_result");
}
}
#[test]
fn serialize_replay_history_keeps_separated_tool_runs_separate() {
let mk_tool = |id: &str| Message {
role: Role::Tool,
content: MessageContent::Parts(vec![ContentPart::ToolResult {
tool_use_id: id.into(),
content: "ok".into(),
is_error: None,
}]),
rsclaw_hidden: None,
};
let asst = Message {
role: Role::Assistant,
content: MessageContent::Text("step".into()),
rsclaw_hidden: None,
};
let ta = mk_tool("a");
let tb = mk_tool("b");
let msgs = vec![&ta, &asst, &tb];
let out = serialize_replay_history(&msgs);
assert_eq!(out.len(), 3);
assert_eq!(out[0]["role"], "user");
assert_eq!(out[0]["content"][0]["tool_use_id"], "a");
assert_eq!(out[1]["role"], "assistant");
assert_eq!(out[2]["role"], "user");
assert_eq!(out[2]["content"][0]["tool_use_id"], "b");
}
#[test]
fn serialize_replay_history_drops_tool_run_with_no_tool_result_parts() {
let bad = Message {
role: Role::Tool,
content: MessageContent::Parts(vec![ContentPart::Text {
text: "noise".into(),
}]),
rsclaw_hidden: None,
};
let user = Message {
role: Role::User,
content: MessageContent::Text("hi".into()),
rsclaw_hidden: None,
};
let msgs = vec![&user, &bad];
let out = serialize_replay_history(&msgs);
assert_eq!(out.len(), 1, "only the User survives: {out:?}");
assert_eq!(out[0]["role"], "user");
}
#[test]
fn turn_delta_user_text() {
let req = req_with(
vec![Message {
role: Role::User,
content: MessageContent::Text("hello".into()),
rsclaw_hidden: None,
}],
2,
Some("k"),
);
let delta = TurnDelta::from_request(&req).unwrap();
let body = serde_json::to_value(&delta).unwrap();
assert_eq!(body["user_message"], "hello");
}
#[test]
fn turn_request_serializes_recall_as_independent_top_level_fields() {
let mut req = req_with(
vec![Message {
role: Role::User,
content: MessageContent::Text("hello".into()),
rsclaw_hidden: None,
}],
2,
Some("k"),
);
req.recall = Some(crate::RecallBundle {
context: "用户手机号: 13900001234".into(),
metadata: crate::RecallMetadata {
mode: "committed".into(),
format: "xml".into(),
source: "server".into(),
trace_id: Some("recall_test".into()),
max_tokens: Some(1200),
doc_ids: vec!["doc-1".into()],
hash: "sha256:abc".into(),
truncated: false,
},
});
let delta = TurnDelta::from_request(&req).unwrap();
let body = serde_json::to_value(&TurnReq {
delta: &delta,
recall_context: req.recall.as_ref().map(|r| r.context.as_str()),
recall: req.recall.as_ref().map(|r| &r.metadata),
options: Some(TurnOptions::from_request(&req, false)),
stream: true,
})
.unwrap();
assert_eq!(body["user_message"], "hello");
assert_eq!(body["recall_context"], "用户手机号: 13900001234");
assert_eq!(body["recall"]["mode"], "committed");
assert_eq!(body["recall"]["format"], "xml");
assert_eq!(body["recall"]["doc_ids"][0], "doc-1");
assert!(
!body["user_message"].as_str().unwrap().contains("<recall>"),
"worker owns canonical recall wrapper"
);
}
#[test]
fn turn_request_omits_empty_recall_fields() {
let mut req = req_with(
vec![Message {
role: Role::User,
content: MessageContent::Text("hello".into()),
rsclaw_hidden: None,
}],
2,
Some("k"),
);
req.recall = Some(crate::RecallBundle {
context: " ".into(),
metadata: crate::RecallMetadata::default(),
});
let delta = TurnDelta::from_request(&req).unwrap();
let recall = req.recall.as_ref().filter(|r| !r.context.trim().is_empty());
let body = serde_json::to_value(&TurnReq {
delta: &delta,
recall_context: recall.map(|r| r.context.as_str()),
recall: recall.map(|r| &r.metadata),
options: Some(TurnOptions::from_request(&req, false)),
stream: true,
})
.unwrap();
assert!(body.get("recall_context").is_none());
assert!(body.get("recall").is_none());
}
#[test]
fn turn_delta_user_text_empty_bails() {
let req = req_with(
vec![Message {
role: Role::User,
content: MessageContent::Text(String::new()),
rsclaw_hidden: None,
}],
2,
Some("k"),
);
let err = TurnDelta::from_request(&req).unwrap_err().to_string();
assert!(err.contains("no usable content"), "got: {err}");
}
#[test]
fn turn_delta_user_parts_with_only_empty_text_bails() {
let req = req_with(
vec![Message {
role: Role::User,
content: MessageContent::Parts(vec![
ContentPart::Text {
text: String::new(),
},
ContentPart::Text {
text: String::new(),
},
]),
rsclaw_hidden: None,
}],
2,
Some("k"),
);
let err = TurnDelta::from_request(&req).unwrap_err().to_string();
assert!(err.contains("no usable content"), "got: {err}");
}
#[test]
fn turn_delta_user_parts_concatenates_text_fragments() {
let req = req_with(
vec![Message {
role: Role::User,
content: MessageContent::Parts(vec![
ContentPart::Text {
text: "hello ".into(),
},
ContentPart::Text {
text: "world".into(),
},
]),
rsclaw_hidden: None,
}],
2,
Some("k"),
);
let delta = TurnDelta::from_request(&req).unwrap();
let body = serde_json::to_value(&delta).unwrap();
assert_eq!(body["user_message"], "hello world");
}
#[test]
fn turn_delta_tool_results() {
let req = req_with(
vec![Message {
role: Role::Tool,
content: MessageContent::Parts(vec![ContentPart::ToolResult {
tool_use_id: "toolu_1".into(),
content: "ok".into(),
is_error: None,
}]),
rsclaw_hidden: None,
}],
2,
Some("k"),
);
let delta = TurnDelta::from_request(&req).unwrap();
let body = serde_json::to_value(&delta).unwrap();
assert_eq!(body["tool_results"][0]["tool_use_id"], "toolu_1");
}
#[test]
fn lookup_and_bump_evicts_on_history_shrink() {
let provider = RsclawProvider::new("http://x", None);
provider.store(
"k",
SessionEntry {
session_id: "rs_w7_abc".into(),
prefix_id: "rsclaw/2026.5.28".into(),
last_seen_msgs_len: 12,
},
);
assert!(
provider
.lookup_and_bump("k", "rsclaw/2026.5.28", 12)
.is_some()
);
assert!(
provider
.lookup_and_bump("k", "rsclaw/2026.5.28", 14)
.is_some()
);
assert!(
provider
.lookup_and_bump("k", "rsclaw/2026.5.28", 8)
.is_none()
);
assert!(
provider
.lookup_and_bump("k", "rsclaw/2026.5.6", 14)
.is_none()
);
assert!(
provider
.lookup_and_bump("missing", "rsclaw/2026.5.28", 14)
.is_none()
);
}
#[test]
fn evict_if_oversized_culls_to_half_cap_when_over() {
let mut map: HashMap<String, SessionEntry> = HashMap::new();
let total = MAX_SESSIONS + 100;
for i in 0..total {
map.insert(
format!("k{i}"),
SessionEntry {
session_id: format!("rs_w_{i}"),
prefix_id: "rsclaw/test".into(),
last_seen_msgs_len: 1,
},
);
}
evict_if_oversized(&mut map);
assert_eq!(map.len(), MAX_SESSIONS / 2);
}
#[test]
fn evict_if_oversized_no_op_when_under_cap() {
let mut map: HashMap<String, SessionEntry> = HashMap::new();
for i in 0..100 {
map.insert(
format!("k{i}"),
SessionEntry {
session_id: format!("rs_{i}"),
prefix_id: "rsclaw/test".into(),
last_seen_msgs_len: 1,
},
);
}
evict_if_oversized(&mut map);
assert_eq!(map.len(), 100);
}
#[tokio::test]
async fn invalidate_on_error_evicts_session_on_first_err() {
let provider = RsclawProvider::new("http://x", None);
provider.store(
"session-key",
SessionEntry {
session_id: "rs_w7_xyz".into(),
prefix_id: "rsclaw/test".into(),
last_seen_msgs_len: 5,
},
);
let inner: LlmStream = Box::pin(futures::stream::iter(vec![
Ok(StreamEvent::TextDelta("hi".into())),
Err(anyhow::anyhow!("boom")),
]));
let wrapped = invalidate_on_error(
inner,
Arc::clone(&provider.sessions),
"session-key".to_owned(),
);
let collected: Vec<_> = wrapped.collect().await;
assert_eq!(collected.len(), 2);
assert!(matches!(collected[0], Ok(StreamEvent::TextDelta(_))));
assert!(collected[1].is_err());
assert!(provider.lock_sessions().get("session-key").is_none());
}
#[tokio::test]
async fn invalidate_on_error_evicts_on_stream_event_error() {
let provider = RsclawProvider::new("http://x", None);
provider.store(
"k",
SessionEntry {
session_id: "rs_w7_abc".into(),
prefix_id: "rsclaw/test".into(),
last_seen_msgs_len: 5,
},
);
let inner: LlmStream = Box::pin(futures::stream::iter(vec![Ok(StreamEvent::Error(
"model_overloaded".into(),
))]));
let wrapped = invalidate_on_error(inner, Arc::clone(&provider.sessions), "k".into());
let _: Vec<_> = wrapped.collect().await;
assert!(provider.lock_sessions().get("k").is_none());
}
#[tokio::test]
async fn invalidate_on_error_keeps_session_on_clean_stream() {
let provider = RsclawProvider::new("http://x", None);
provider.store(
"k",
SessionEntry {
session_id: "rs_w7_abc".into(),
prefix_id: "rsclaw/test".into(),
last_seen_msgs_len: 5,
},
);
let inner: LlmStream = Box::pin(futures::stream::iter(vec![
Ok(StreamEvent::TextDelta("hello".into())),
Ok(StreamEvent::Done { usage: None }),
]));
let wrapped = invalidate_on_error(inner, Arc::clone(&provider.sessions), "k".into());
let _: Vec<_> = wrapped.collect().await;
assert!(provider.lock_sessions().get("k").is_some());
}
#[test]
fn turn_delta_collects_parallel_tool_results() {
let tool_msg = |id: &str, body: &str| Message {
role: Role::Tool,
content: MessageContent::Parts(vec![ContentPart::ToolResult {
tool_use_id: id.into(),
content: body.into(),
is_error: None,
}]),
rsclaw_hidden: None,
};
let req = req_with(
vec![
Message {
role: Role::User,
content: MessageContent::Text("do three things".into()),
rsclaw_hidden: None,
},
tool_msg("toolu_a", "result a"),
tool_msg("toolu_b", "result b"),
tool_msg("toolu_c", "result c"),
],
2,
Some("k"),
);
let delta = TurnDelta::from_request(&req).unwrap();
let body = serde_json::to_value(&delta).unwrap();
let arr = body["tool_results"].as_array().unwrap();
assert_eq!(arr.len(), 3);
assert_eq!(arr[0]["tool_use_id"], "toolu_a");
assert_eq!(arr[1]["tool_use_id"], "toolu_b");
assert_eq!(arr[2]["tool_use_id"], "toolu_c");
}
#[test]
fn turn_delta_does_not_cross_user_boundary() {
let req = req_with(
vec![
Message {
role: Role::Tool,
content: MessageContent::Parts(vec![ContentPart::ToolResult {
tool_use_id: "toolu_old".into(),
content: "stale".into(),
is_error: None,
}]),
rsclaw_hidden: None,
},
Message {
role: Role::Assistant,
content: MessageContent::Text("ack".into()),
rsclaw_hidden: None,
},
Message {
role: Role::Tool,
content: MessageContent::Parts(vec![ContentPart::ToolResult {
tool_use_id: "toolu_new".into(),
content: "fresh".into(),
is_error: None,
}]),
rsclaw_hidden: None,
},
],
2,
Some("k"),
);
let delta = TurnDelta::from_request(&req).unwrap();
let body = serde_json::to_value(&delta).unwrap();
let arr = body["tool_results"].as_array().unwrap();
assert_eq!(arr.len(), 1);
assert_eq!(arr[0]["tool_use_id"], "toolu_new");
}
#[test]
fn split_system_messages_lifts_system_to_suffix() {
let m = |role: Role, txt: &str| Message {
role,
content: MessageContent::Text(txt.into()),
rsclaw_hidden: None,
};
let msgs = vec![
m(Role::System, "PLUGINS"),
m(Role::System, "SKILLS"),
m(Role::User, "hi"),
m(Role::Assistant, "yo"),
m(Role::System, "## New Skill Installed\nfoo"),
m(Role::User, "again"),
];
let (filtered, suffix) = split_system_messages(&msgs);
assert_eq!(filtered.len(), 3);
for m in &filtered {
assert!(!matches!(m.role, Role::System));
}
assert_eq!(suffix, "PLUGINS\n\nSKILLS\n\n## New Skill Installed\nfoo");
}
#[test]
fn split_system_messages_handles_text_parts() {
let msgs = vec![Message {
role: Role::System,
content: MessageContent::Parts(vec![
ContentPart::Text {
text: "hello ".into(),
},
ContentPart::Text {
text: "world".into(),
},
]),
rsclaw_hidden: None,
}];
let (filtered, suffix) = split_system_messages(&msgs);
assert!(filtered.is_empty());
assert_eq!(suffix, "hello world");
}
#[test]
fn split_system_messages_empty_when_no_system() {
let msgs = vec![Message {
role: Role::User,
content: MessageContent::Text("hi".into()),
rsclaw_hidden: None,
}];
let (filtered, suffix) = split_system_messages(&msgs);
assert_eq!(filtered.len(), 1);
assert!(suffix.is_empty());
}
#[test]
fn split_system_messages_drops_empty_text_system() {
let msgs = vec![
Message {
role: Role::User,
content: MessageContent::Text("hi".into()),
rsclaw_hidden: None,
},
Message {
role: Role::System,
content: MessageContent::Text(String::new()),
rsclaw_hidden: None,
},
Message {
role: Role::System,
content: MessageContent::Text("real ctx".into()),
rsclaw_hidden: None,
},
];
let (filtered, suffix) = split_system_messages(&msgs);
assert_eq!(filtered.len(), 1);
assert_eq!(
suffix, "real ctx",
"leading empty System must not produce a blank-line prefix; got {suffix:?}"
);
}
#[test]
fn split_system_messages_drops_parts_with_only_empty_text() {
let msgs = vec![Message {
role: Role::System,
content: MessageContent::Parts(vec![
ContentPart::Text {
text: String::new(),
},
ContentPart::Image {
url: "https://x/i".into(),
},
]),
rsclaw_hidden: None,
}];
let (_filtered, suffix) = split_system_messages(&msgs);
assert!(
suffix.is_empty(),
"Parts whose only Text was empty must not leak; got {suffix:?}"
);
}
#[test]
fn normalize_trailing_system_folds_into_user_text() {
let m = |role: Role, txt: &str| Message {
role,
content: MessageContent::Text(txt.into()),
rsclaw_hidden: None,
};
let mut msgs = vec![
m(Role::User, "fix the bug"),
m(Role::System, "## Dynamic /ctx\nworking on handler.py"),
];
normalize_trailing_system(&mut msgs);
assert_eq!(msgs.len(), 1);
assert!(matches!(msgs[0].role, Role::User));
let MessageContent::Text(t) = &msgs[0].content else {
panic!("expected Text content")
};
assert_eq!(t, "fix the bug\n\n## Dynamic /ctx\nworking on handler.py");
}
#[test]
fn normalize_trailing_system_concatenates_multiple_in_order() {
let m = |role: Role, txt: &str| Message {
role,
content: MessageContent::Text(txt.into()),
rsclaw_hidden: None,
};
let mut msgs = vec![
m(Role::User, "go"),
m(Role::System, "FIRST"),
m(Role::System, "SECOND"),
];
normalize_trailing_system(&mut msgs);
assert_eq!(msgs.len(), 1);
let MessageContent::Text(t) = &msgs[0].content else {
panic!("expected Text content")
};
assert_eq!(t, "go\n\nFIRST\n\nSECOND");
}
#[test]
fn normalize_trailing_system_noop_without_trailing_system() {
let m = |role: Role, txt: &str| Message {
role,
content: MessageContent::Text(txt.into()),
rsclaw_hidden: None,
};
let original = vec![m(Role::User, "hi"), m(Role::Assistant, "yo")];
let mut msgs = original.clone();
normalize_trailing_system(&mut msgs);
assert_eq!(msgs.len(), 2);
let MessageContent::Text(last) = &msgs[1].content else {
panic!()
};
assert_eq!(last, "yo");
}
#[test]
fn normalize_trailing_system_folds_into_user_parts() {
let mut msgs = vec![
Message {
role: Role::User,
content: MessageContent::Parts(vec![
ContentPart::Text {
text: "look at this".into(),
},
ContentPart::Image {
url: "https://x/y.png".into(),
},
]),
rsclaw_hidden: None,
},
Message {
role: Role::System,
content: MessageContent::Text("CTX".into()),
rsclaw_hidden: None,
},
];
normalize_trailing_system(&mut msgs);
assert_eq!(msgs.len(), 1);
let MessageContent::Parts(parts) = &msgs[0].content else {
panic!("expected Parts content")
};
assert_eq!(parts.len(), 3);
match &parts[2] {
ContentPart::Text { text } => assert_eq!(text, "CTX"),
_ => panic!("expected appended Text part"),
}
}
#[test]
fn normalize_trailing_system_drops_when_preceded_by_tool() {
let mut msgs = vec![
Message {
role: Role::Tool,
content: MessageContent::Parts(vec![ContentPart::ToolResult {
tool_use_id: "toolu_1".into(),
content: "result".into(),
is_error: None,
}]),
rsclaw_hidden: None,
},
Message {
role: Role::System,
content: MessageContent::Text("dynamic ctx".into()),
rsclaw_hidden: None,
},
];
normalize_trailing_system(&mut msgs);
assert_eq!(msgs.len(), 1);
assert!(matches!(msgs[0].role, Role::Tool));
}
#[test]
fn normalize_trailing_system_skips_empty_system_blocks() {
let m = |role: Role, txt: &str| Message {
role,
content: MessageContent::Text(txt.into()),
rsclaw_hidden: None,
};
let mut msgs = vec![
m(Role::User, "hi"),
m(Role::System, ""),
m(Role::System, "non-empty"),
];
normalize_trailing_system(&mut msgs);
assert_eq!(msgs.len(), 1);
let MessageContent::Text(t) = &msgs[0].content else {
panic!()
};
assert_eq!(t, "hi\n\nnon-empty");
}
#[test]
fn turn_options_temperature_clamps_to_two_decimals() {
let opts = TurnOptions {
max_tokens: None,
temperature: Some(0.6),
top_p: Some(0.95),
enable_thinking: None,
stop: None,
idle_ttl_secs: None,
constrain_tool_calls: None,
};
let body = serde_json::to_value(&opts).unwrap();
let s = serde_json::to_string(&opts).unwrap();
assert!(
s.contains("\"temperature\":0.6"),
"expected temperature:0.6, got {s}"
);
assert!(
!s.contains("0.6000000238418579"),
"leaked f32→f64 noise: {s}"
);
assert!(s.contains("\"top_p\":0.95"), "expected top_p:0.95, got {s}");
assert!(body.is_object());
}
#[test]
fn turn_options_temperature_none_omits_field() {
let opts = TurnOptions {
max_tokens: None,
temperature: None,
top_p: None,
enable_thinking: None,
stop: None,
idle_ttl_secs: None,
constrain_tool_calls: None,
};
let body = serde_json::to_value(&opts).unwrap();
assert!(body.get("temperature").is_none());
assert!(body.get("top_p").is_none());
}
#[test]
fn turn_options_constrain_tool_calls_wire_shape() {
let mut req = req_with(
vec![Message {
role: Role::User,
content: MessageContent::Text("hi".into()),
rsclaw_hidden: None,
}],
2,
Some("k"),
);
req.tools = vec![crate::ToolDef {
name: "read_file".into(),
description: "read".into(),
parameters: serde_json::json!({"type": "object", "properties": {}}),
}];
let off = serde_json::to_value(TurnOptions::from_request(&req, false)).unwrap();
assert!(off.get("constrain_tool_calls").is_none());
let on = serde_json::to_value(TurnOptions::from_request(&req, true)).unwrap();
assert_eq!(on["constrain_tool_calls"], true);
req.tools.clear();
let no_tools = serde_json::to_value(TurnOptions::from_request(&req, true)).unwrap();
assert!(no_tools.get("constrain_tool_calls").is_none());
}
#[test]
fn create_session_resp_parses_replay_shape_without_prefix_id() {
let body = r#"{
"session_id": "rs_w7_8a3c1f2b",
"n_prefix_tokens": 27981,
"n_user_tokens": 612,
"n_history_tokens": 8420,
"n_tokens": 37013,
"instance_id": "llama-worker-7",
"replay_ms": 2340
}"#;
let resp: CreateSessionResp = serde_json::from_str(body).expect("replay shape parses");
assert_eq!(resp.session_id, "rs_w7_8a3c1f2b");
assert!(resp.prefix_id.is_none());
}
#[test]
fn create_session_resp_parses_create_shape_with_prefix_id() {
let body = r#"{
"session_id": "rs_w7_8a3c1f2b",
"prefix_id": "rsclaw/2026.5.28"
}"#;
let resp: CreateSessionResp = serde_json::from_str(body).expect("create shape parses");
assert_eq!(resp.prefix_id.as_deref(), Some("rsclaw/2026.5.28"));
}
#[test]
fn create_session_resp_ignores_unknown_legacy_rsclaw_version() {
let body = r#"{
"session_id":"rs_w7_8cebc736",
"prefix_id":"dynamic/9e8598684ad34ff0a615899fefb811de",
"prefix_source":"dynamic_miss",
"rsclaw_version":""
}"#;
let resp: CreateSessionResp =
serde_json::from_str(body).expect("mixed post-rename + legacy fields must parse");
assert_eq!(resp.session_id, "rs_w7_8cebc736");
assert_eq!(
resp.prefix_id.as_deref(),
Some("dynamic/9e8598684ad34ff0a615899fefb811de"),
);
}
#[test]
fn create_session_resp_parses_explicit_null_prefix_id() {
let body = r#"{"session_id":"rs_a_b","prefix_id":null}"#;
let resp: CreateSessionResp =
serde_json::from_str(body).expect("null prefix_id must parse");
assert_eq!(resp.session_id, "rs_a_b");
assert!(resp.prefix_id.is_none());
}
#[test]
fn create_session_resp_parses_missing_prefix_id() {
let body = r#"{"session_id":"rs_a_b"}"#;
let resp: CreateSessionResp = serde_json::from_str(body).expect("missing field must parse");
assert!(resp.prefix_id.is_none());
}
#[test]
fn create_session_resp_parses_populated_prefix_id() {
let body = r#"{"session_id":"rs_a_b","prefix_id":"rsclaw/2026.5.28"}"#;
let resp: CreateSessionResp = serde_json::from_str(body).expect("string field must parse");
assert_eq!(resp.prefix_id.as_deref(), Some("rsclaw/2026.5.28"));
}
fn dispatch_req(
model: &str,
endpoint: AgentEndpoint,
session_key: Option<&str>,
kv_cache_mode: u8,
) -> LlmRequest {
LlmRequest {
fallback_models: Vec::new(),
model: model.into(),
endpoint,
kv_cache_mode,
session_key: session_key.map(str::to_string),
..Default::default()
}
}
#[test]
fn dispatch_rule_1_flash_model_routes_fastshot() {
let route = dispatch_decision(&dispatch_req(
"rsclaw/rsclaw-flash-v1",
AgentEndpoint::Primary,
None,
0,
))
.unwrap();
assert_eq!(route, DispatchRoute::OneShot("/fastshot"));
let route = dispatch_decision(&dispatch_req(
"rsclaw/rsclaw-flash-v1",
AgentEndpoint::Vision,
None,
0,
))
.unwrap();
assert_eq!(route, DispatchRoute::OneShot("/fastshot"));
}
#[test]
fn dispatch_rule_2_vision_model_routes_vision() {
let route = dispatch_decision(&dispatch_req(
"rsclaw/rsclaw-vision-v1",
AgentEndpoint::Primary,
None,
0,
))
.unwrap();
assert_eq!(route, DispatchRoute::OneShot("/vision"));
}
#[test]
fn dispatch_rule_3_agent_model_no_session_routes_oneshot() {
let route = dispatch_decision(&dispatch_req(
"rsclaw/rsclaw-agent-v1",
AgentEndpoint::Primary,
None,
0,
))
.unwrap();
assert_eq!(route, DispatchRoute::OneShot("/oneshot"));
}
#[test]
fn dispatch_rule_4_agent_model_with_session_routes_sessions() {
let route = dispatch_decision(&dispatch_req(
"rsclaw/rsclaw-agent-v1",
AgentEndpoint::Primary,
Some("sess-x"),
2,
))
.unwrap();
assert_eq!(route, DispatchRoute::Sessions);
}
#[test]
fn dispatch_rule_5_non_canonical_flash_endpoint_routes_fastshot() {
let route = dispatch_decision(&dispatch_req(
"anthropic/claude-3-5-haiku",
AgentEndpoint::Flash,
None,
0,
))
.unwrap();
assert_eq!(route, DispatchRoute::OneShot("/fastshot"));
}
#[test]
fn dispatch_rule_6_non_canonical_vision_endpoint_routes_vision() {
let route = dispatch_decision(&dispatch_req(
"anthropic/claude-3-5-sonnet",
AgentEndpoint::Vision,
None,
0,
))
.unwrap();
assert_eq!(route, DispatchRoute::OneShot("/vision"));
}
#[test]
fn dispatch_rule_7_primary_with_session_routes_sessions() {
let route = dispatch_decision(&dispatch_req(
"anthropic/claude-3-5-sonnet",
AgentEndpoint::Primary,
Some("sess-y"),
2,
))
.unwrap();
assert_eq!(route, DispatchRoute::Sessions);
}
#[test]
fn dispatch_rule_8_primary_stateless_routes_oneshot() {
let route = dispatch_decision(&dispatch_req(
"anthropic/claude-3-5-sonnet",
AgentEndpoint::Primary,
None,
0,
))
.unwrap();
assert_eq!(route, DispatchRoute::OneShot("/oneshot"));
}
#[test]
fn dispatch_bail_kv2_without_session_key() {
let err = dispatch_decision(&dispatch_req(
"anthropic/claude-3-5-sonnet",
AgentEndpoint::Primary,
None,
2,
))
.unwrap_err()
.to_string();
assert!(err.contains("session_key"), "got: {err}");
assert!(err.contains("kv_cache_mode=2"), "got: {err}");
}
#[test]
fn dispatch_bail_session_without_kv2() {
let err = dispatch_decision(&dispatch_req(
"anthropic/claude-3-5-sonnet",
AgentEndpoint::Primary,
Some("sess-z"),
1,
))
.unwrap_err()
.to_string();
assert!(err.contains("kv_cache_mode=2"), "got: {err}");
}
#[test]
fn dispatch_canonical_model_overrides_endpoint_hint() {
let route = dispatch_decision(&dispatch_req(
"rsclaw/rsclaw-flash-v1",
AgentEndpoint::Primary,
Some("sess-q"),
2,
))
.unwrap();
assert_eq!(route, DispatchRoute::OneShot("/fastshot"));
}
#[test]
fn dispatch_rule_3_overrides_rule_5_for_agent_model() {
let route = dispatch_decision(&dispatch_req(
"rsclaw/rsclaw-agent-v1",
AgentEndpoint::Flash,
None,
0,
))
.unwrap();
assert_eq!(route, DispatchRoute::OneShot("/oneshot"));
}
#[test]
fn rejects_non_kv2_mode() {
let provider = RsclawProvider::new("http://x", None);
let req = req_with(vec![], 1, Some("k"));
let err = match futures::executor::block_on(provider.stream(req)) {
Ok(_) => panic!("expected error for kv_cache_mode=1"),
Err(e) => e,
};
assert!(err.to_string().contains("kv_cache_mode=2"));
}
#[tokio::test]
async fn rejects_session_mode_without_kv_cache_mode_2() {
let provider = RsclawProvider::new("http://x", None);
let req = req_with(vec![], 0, Some("session-xyz"));
let err = match provider.stream(req).await {
Ok(_) => panic!("expected error for session_key + kv_cache_mode!=2"),
Err(e) => e,
};
assert!(
err.to_string().contains("kv_cache_mode=2"),
"unexpected error text: {err}"
);
}
#[test]
fn compact_splice_req_serialises_post_2_4_shape() {
let body = CompactSpliceReq {
keep_head_messages: 2,
summary: "<sum>",
keep_tail_messages: 10,
expected_msgs_count: Some(80),
};
let v = serde_json::to_value(&body).unwrap();
assert_eq!(v["keep_head_messages"], 2);
assert_eq!(v["summary"], "<sum>");
assert_eq!(v["keep_tail_messages"], 10);
assert_eq!(v["expected_msgs_count"], 80);
let body_no_expect = CompactSpliceReq {
keep_head_messages: 2,
summary: "<sum>",
keep_tail_messages: 10,
expected_msgs_count: None,
};
let v_no_expect = serde_json::to_value(&body_no_expect).unwrap();
assert!(
v_no_expect.get("expected_msgs_count").is_none(),
"None must be omitted from the wire body, not emitted as null"
);
}
#[test]
fn compact_splice_resp_parses_happy_shape() {
let body = r#"{"session_id":"rs_w7_abc","msgs_count":13,"tokens_count":8421}"#;
let resp: CompactSpliceResp =
serde_json::from_str(body).expect("happy compact response must parse");
assert_eq!(resp.session_id, "rs_w7_abc");
assert_eq!(resp.msgs_count, 13);
assert_eq!(resp.tokens_count, 8421);
}
#[test]
fn compact_splice_trait_default_returns_err_for_non_rsclaw() {
use crate::LlmProvider;
struct StubProvider;
impl LlmProvider for StubProvider {
fn name(&self) -> &str {
"stub"
}
fn stream(
&self,
_req: crate::LlmRequest,
) -> futures::future::BoxFuture<'_, anyhow::Result<crate::LlmStream>> {
Box::pin(async { anyhow::bail!("stub provider has no streaming") })
}
}
let p = StubProvider;
let err = futures::executor::block_on(p.compact_splice("k", 2, "x", 10, None))
.expect_err("default impl must Err");
let msg = err.to_string();
assert!(
msg.contains("not supported") && msg.contains("stub"),
"default impl Err should name the provider: {msg}"
);
}
#[tokio::test]
async fn compact_splice_errs_when_no_cached_session() {
use crate::LlmProvider;
let provider = RsclawProvider::new("http://nonexistent-host.invalid", None);
let err = provider
.compact_splice("missing-key", 2, "summary", 10, None)
.await
.expect_err("should Err when no cached SessionEntry exists");
let msg = err.to_string();
assert!(
msg.contains("no cached session"),
"Err message should mention missing cached session, got: {msg}"
);
}
#[tokio::test]
async fn compact_splice_updates_last_seen_msgs_len_on_success() {
use wiremock::{
Mock, MockServer, ResponseTemplate,
matchers::{method, path},
};
use crate::LlmProvider;
let mock_server = MockServer::start().await;
let session_id = "rs_w7_abc";
Mock::given(method("POST"))
.and(path(format!("/sessions/{}/compact", session_id)))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"session_id": session_id,
"msgs_count": 13,
"tokens_count": 8421,
})))
.expect(1)
.mount(&mock_server)
.await;
let provider = RsclawProvider::new(mock_server.uri(), None);
{
let mut map = provider.lock_sessions();
map.insert(
"test-key".to_owned(),
SessionEntry {
session_id: session_id.to_owned(),
prefix_id: RSCLAW_DEFAULT_PREFIX_ID.to_owned(),
last_seen_msgs_len: 50,
},
);
}
let result = provider
.compact_splice("test-key", 2, "<summary>", 10, Some(50))
.await
.expect("happy-path splice should succeed");
assert_eq!(result, 13, "trait method returns server's msgs_count");
let map = provider.lock_sessions();
let entry = map
.get("test-key")
.expect("SessionEntry must still exist after splice — id is preserved");
assert_eq!(
entry.last_seen_msgs_len, 13,
"last_seen_msgs_len must be updated to head(2) + summary(1) + tail(10)"
);
assert_eq!(
entry.session_id, session_id,
"session_id MUST be unchanged across splice (§2.4 invariant)"
);
assert_eq!(
entry.prefix_id, RSCLAW_DEFAULT_PREFIX_ID,
"prefix_id must be unchanged"
);
}
#[test]
fn compact_splice_409_body_parses() {
let body = r#"{"error":{"code":"msg_count_mismatch","detail":"expected 50, got 52","current":52}}"#;
let parsed: CompactSplice409 = serde_json::from_str(body).expect("409 body must parse");
assert_eq!(parsed.error.current, 52);
}
#[tokio::test]
async fn compact_splice_retries_on_409_then_succeeds() {
use wiremock::{
Mock, MockServer, ResponseTemplate,
matchers::{method, path},
};
use crate::LlmProvider;
let mock_server = MockServer::start().await;
let session_id = "rs_w7_retry";
let compact_path = format!("/sessions/{}/compact", session_id);
Mock::given(method("POST"))
.and(path(compact_path.clone()))
.respond_with(ResponseTemplate::new(409).set_body_json(serde_json::json!({
"error": {
"code": "msg_count_mismatch",
"detail": "expected 50, got 52",
"current": 52
}
})))
.up_to_n_times(1)
.expect(1)
.mount(&mock_server)
.await;
Mock::given(method("POST"))
.and(path(compact_path))
.respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({
"session_id": session_id,
"msgs_count": 15,
"tokens_count": 9000,
})))
.expect(1)
.mount(&mock_server)
.await;
let provider = RsclawProvider::new(mock_server.uri(), None);
{
let mut map = provider.lock_sessions();
map.insert(
"retry-key".to_owned(),
SessionEntry {
session_id: session_id.to_owned(),
prefix_id: RSCLAW_DEFAULT_PREFIX_ID.to_owned(),
last_seen_msgs_len: 50,
},
);
}
let result = provider
.compact_splice("retry-key", 2, "<summary>", 10, Some(50))
.await
.expect("splice should succeed after one 409 retry");
assert_eq!(
result, 15,
"returns server msgs_count from the retried call"
);
let map = provider.lock_sessions();
let entry = map.get("retry-key").expect("entry preserved");
assert_eq!(
entry.last_seen_msgs_len, 15,
"last_seen recomputed from the GROWN tail: head(2)+summary(1)+tail(12)"
);
}
}