use std::collections::HashMap;
use std::process::Stdio;
use claude_code_agent_sdk::{ContentBlock, Message, ToolResultBlock, ToolResultContent};
use serde_json::{json, Value};
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::Command;
use tokio::sync::mpsc;
use tracing::{trace, warn};
pub fn locate_binary() -> Option<std::path::PathBuf> {
which::which("claude").ok()
}
pub fn default_models() -> Vec<String> {
vec![
"claude-opus-4-8".into(),
"claude-sonnet-4-6".into(),
"claude-haiku-4-5".into(),
"default".into(),
]
}
#[derive(Debug, Clone)]
pub enum ClaudeEvent {
Init { session_id: String },
TextStart { block_id: String },
TextDelta { block_id: String, text: String },
TextEnd { block_id: String },
ThinkingStart { block_id: String },
ThinkingDelta { block_id: String, text: String },
ThinkingEnd { block_id: String },
ToolUse { id: String, name: String, input: Value },
ToolResult { id: String, output: Value, is_error: bool },
Result {
session_id: String,
total_cost_usd: Option<f64>,
duration_ms: u64,
num_turns: u32,
is_error: bool,
},
Error { message: String },
}
#[derive(Clone, Debug)]
pub struct McpConfig {
pub url: String,
pub auth_header: Option<String>,
}
const MCP_SYSTEM_HINT: &str = "You are connected to the user's Kyma data warehouse through the `kyma` MCP server. \
To answer questions about their data, use its tools — list_databases, describe_table, explore_schema, \
sample_rows, run_sql, run_kql, find_references_to, and graph_traverse — rather than guessing. \
Call `memory_search` FIRST when a question may depend on prior context, decisions, or how entities \
relate — it does graph-aware hybrid recall and returns connected resources you can follow with \
graph_traverse. Query the real data, then answer concisely based on what you find.";
#[derive(Clone, Copy, PartialEq)]
enum BlockKind {
Text,
Thinking,
}
struct BlockState {
id: String,
kind: BlockKind,
}
pub fn run_stream(
question: &str,
model: Option<&str>,
resume_session_id: Option<&str>,
cwd: Option<&std::path::Path>,
mcp: Option<&McpConfig>,
) -> anyhow::Result<mpsc::UnboundedReceiver<ClaudeEvent>> {
let binary = locate_binary()
.ok_or_else(|| anyhow::anyhow!("`claude` not found on PATH — install Claude Code"))?;
let mut cmd = Command::new(&binary);
cmd.arg("--print")
.arg("--output-format")
.arg("stream-json")
.arg("--include-partial-messages")
.arg("--verbose");
if let Some(m) = model {
if m != "default" {
cmd.arg("--model").arg(m);
}
}
if let Some(sid) = resume_session_id {
cmd.arg("--resume").arg(sid);
}
let mcp_config_path = match mcp {
Some(cfg) => match write_mcp_config(cfg) {
Ok(path) => {
cmd.arg("--mcp-config").arg(&path);
cmd.arg("--allowedTools").arg("mcp__kyma");
cmd.arg("--append-system-prompt").arg(MCP_SYSTEM_HINT);
Some(path)
}
Err(e) => {
warn!(error = %e, "claude_cli: failed to write MCP config; running without Kyma tools");
None
}
},
None => None,
};
cmd.arg(question);
if let Some(dir) = cwd {
cmd.current_dir(dir);
}
cmd.stdin(Stdio::null());
cmd.stdout(Stdio::piped());
cmd.stderr(Stdio::piped());
let mut child = cmd.spawn().map_err(|e| anyhow::anyhow!("spawn `claude`: {e}"))?;
let stdout = child.stdout.take().ok_or_else(|| anyhow::anyhow!("no stdout"))?;
let stderr = child.stderr.take().ok_or_else(|| anyhow::anyhow!("no stderr"))?;
let (tx, rx) = mpsc::unbounded_channel::<ClaudeEvent>();
tokio::spawn(async move {
let mut lines = BufReader::new(stderr).lines();
while let Ok(Some(line)) = lines.next_line().await {
if !line.trim().is_empty() {
warn!(line = %line, "claude_cli stderr");
}
}
});
tokio::spawn(async move {
let mut next_block = 0u64;
let mut blocks: HashMap<u64, BlockState> = HashMap::new();
let mut session_id = String::new();
let mut saw_result = false;
let mut lines = BufReader::new(stdout).lines();
loop {
let line = match lines.next_line().await {
Ok(Some(line)) => line,
Ok(None) => break, Err(e) => {
let _ = tx.send(ClaudeEvent::Error {
message: format!("claude stdout read error: {e}"),
});
saw_result = true;
break;
}
};
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
let value: Value = match serde_json::from_str(trimmed) {
Ok(v) => v,
Err(e) => {
trace!(error = %e, "claude_cli: non-JSON line skipped");
continue;
}
};
let msg: Message = match serde_json::from_value(value) {
Ok(m) => m,
Err(e) => {
trace!(error = %e, "claude_cli: unmodeled frame skipped");
continue;
}
};
match msg {
Message::System(sys) => {
if let Some(sid) = sys.session_id {
if session_id.is_empty() {
session_id = sid.clone();
let _ = tx.send(ClaudeEvent::Init { session_id: sid });
}
}
}
Message::StreamEvent(se) => {
if session_id.is_empty() && !se.session_id.is_empty() {
session_id = se.session_id.clone();
}
handle_stream_event(&se.event, &tx, &mut next_block, &mut blocks);
}
Message::Assistant(a) => {
for block in a.message.content {
if let ContentBlock::ToolUse(tu) = block {
let _ = tx.send(ClaudeEvent::ToolUse {
id: tu.id,
name: tu.name,
input: tu.input,
});
}
}
}
Message::User(u) => {
if let Some(content) = u.content {
for block in content {
if let ContentBlock::ToolResult(tr) = block {
let (output, is_err) = tool_result_payload(&tr);
let _ = tx.send(ClaudeEvent::ToolResult {
id: tr.tool_use_id,
output,
is_error: is_err,
});
}
}
}
}
Message::Result(r) => {
if session_id.is_empty() {
session_id = r.session_id.clone();
}
let _ = tx.send(ClaudeEvent::Result {
session_id: r.session_id,
total_cost_usd: r.total_cost_usd,
duration_ms: r.duration_ms,
num_turns: r.num_turns,
is_error: r.is_error,
});
saw_result = true;
}
Message::ControlCancelRequest(_) => {}
}
}
let status = child.wait().await;
if let Some(path) = &mcp_config_path {
let _ = std::fs::remove_file(path);
}
if !saw_result {
match status {
Ok(s) if !s.success() => {
let _ = tx.send(ClaudeEvent::Error {
message: format!(
"claude exited with {}",
s.code().map(|c| c.to_string()).unwrap_or_else(|| "signal".into())
),
});
}
_ => {
let _ = tx.send(ClaudeEvent::Result {
session_id,
total_cost_usd: None,
duration_ms: 0,
num_turns: 0,
is_error: false,
});
}
}
}
});
Ok(rx)
}
fn handle_stream_event(
event: &Value,
tx: &mpsc::UnboundedSender<ClaudeEvent>,
next_block: &mut u64,
blocks: &mut HashMap<u64, BlockState>,
) {
let Some(kind) = event.get("type").and_then(Value::as_str) else {
return;
};
match kind {
"content_block_start" => {
let idx = event.get("index").and_then(Value::as_u64).unwrap_or(0);
let block_type = event
.get("content_block")
.and_then(|b| b.get("type"))
.and_then(Value::as_str)
.unwrap_or("");
match block_type {
"text" => {
let id = register_block(idx, BlockKind::Text, next_block, blocks);
let _ = tx.send(ClaudeEvent::TextStart { block_id: id });
}
"thinking" => {
let id = register_block(idx, BlockKind::Thinking, next_block, blocks);
let _ = tx.send(ClaudeEvent::ThinkingStart { block_id: id });
}
_ => {}
}
}
"content_block_delta" => {
let idx = event.get("index").and_then(Value::as_u64).unwrap_or(0);
let delta = event.get("delta");
let delta_type = delta
.and_then(|d| d.get("type"))
.and_then(Value::as_str)
.unwrap_or("");
match delta_type {
"text_delta" => {
if let Some(text) = delta.and_then(|d| d.get("text")).and_then(Value::as_str) {
let id = ensure_block(idx, BlockKind::Text, next_block, blocks, tx);
let _ = tx.send(ClaudeEvent::TextDelta {
block_id: id,
text: text.to_string(),
});
}
}
"thinking_delta" => {
if let Some(text) =
delta.and_then(|d| d.get("thinking")).and_then(Value::as_str)
{
if text.is_empty() {
return;
}
let id = ensure_block(idx, BlockKind::Thinking, next_block, blocks, tx);
let _ = tx.send(ClaudeEvent::ThinkingDelta {
block_id: id,
text: text.to_string(),
});
}
}
_ => {}
}
}
"content_block_stop" => {
let idx = event.get("index").and_then(Value::as_u64).unwrap_or(0);
if let Some(state) = blocks.remove(&idx) {
match state.kind {
BlockKind::Text => {
let _ = tx.send(ClaudeEvent::TextEnd { block_id: state.id });
}
BlockKind::Thinking => {
let _ = tx.send(ClaudeEvent::ThinkingEnd { block_id: state.id });
}
}
}
}
_ => {}
}
}
fn register_block(
idx: u64,
kind: BlockKind,
next_block: &mut u64,
blocks: &mut HashMap<u64, BlockState>,
) -> String {
let id = format!("b{next_block}");
*next_block += 1;
blocks.insert(idx, BlockState { id: id.clone(), kind });
id
}
fn ensure_block(
idx: u64,
kind: BlockKind,
next_block: &mut u64,
blocks: &mut HashMap<u64, BlockState>,
tx: &mpsc::UnboundedSender<ClaudeEvent>,
) -> String {
if let Some(state) = blocks.get(&idx) {
return state.id.clone();
}
let id = register_block(idx, kind, next_block, blocks);
match kind {
BlockKind::Text => {
let _ = tx.send(ClaudeEvent::TextStart {
block_id: id.clone(),
});
}
BlockKind::Thinking => {
let _ = tx.send(ClaudeEvent::ThinkingStart {
block_id: id.clone(),
});
}
}
id
}
fn write_mcp_config(cfg: &McpConfig) -> anyhow::Result<std::path::PathBuf> {
use std::io::Write;
let mut server = json!({ "type": "http", "url": cfg.url });
if let Some(auth) = &cfg.auth_header {
server["headers"] = json!({ "Authorization": auth });
}
let config = json!({ "mcpServers": { "kyma": server } });
let path = std::env::temp_dir().join(format!("kyma-mcp-{}.json", uuid::Uuid::new_v4()));
let mut file = std::fs::File::create(&path)?;
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
file.set_permissions(std::fs::Permissions::from_mode(0o600))?;
}
file.write_all(serde_json::to_string(&config)?.as_bytes())?;
Ok(path)
}
fn tool_result_payload(tr: &ToolResultBlock) -> (Value, bool) {
let is_err = tr.is_error.unwrap_or(false);
let output = match &tr.content {
Some(ToolResultContent::Text(s)) => Value::String(s.clone()),
Some(ToolResultContent::Blocks(blocks)) => Value::Array(blocks.clone()),
None => json!(null),
};
(output, is_err)
}
#[cfg(test)]
mod tests {
use super::*;
fn run(events: &[Value]) -> Vec<ClaudeEvent> {
let (tx, mut rx) = mpsc::unbounded_channel::<ClaudeEvent>();
let mut next_block = 0u64;
let mut blocks: HashMap<u64, BlockState> = HashMap::new();
for ev in events {
handle_stream_event(ev, &tx, &mut next_block, &mut blocks);
}
drop(tx);
let mut out = Vec::new();
while let Ok(ev) = rx.try_recv() {
out.push(ev);
}
out
}
#[test]
fn text_block_streams_start_delta_end_with_stable_id() {
let evs = run(&[
json!({"type":"content_block_start","index":0,"content_block":{"type":"text","text":""}}),
json!({"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"Hel"}}),
json!({"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"lo"}}),
json!({"type":"content_block_stop","index":0}),
]);
match evs.as_slice() {
[
ClaudeEvent::TextStart { block_id: a },
ClaudeEvent::TextDelta { block_id: b, text: t1 },
ClaudeEvent::TextDelta { block_id: c, text: t2 },
ClaudeEvent::TextEnd { block_id: d },
] => {
assert_eq!(a, b);
assert_eq!(a, c);
assert_eq!(a, d);
assert_eq!(t1, "Hel");
assert_eq!(t2, "lo");
}
other => panic!("unexpected events: {other:?}"),
}
}
#[test]
fn thinking_deltas_map_to_thinking_events_and_skip_empty_and_signature() {
let evs = run(&[
json!({"type":"content_block_start","index":0,"content_block":{"type":"thinking","thinking":""}}),
json!({"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":""}}),
json!({"type":"content_block_delta","index":0,"delta":{"type":"signature_delta","signature":"abc"}}),
json!({"type":"content_block_delta","index":0,"delta":{"type":"thinking_delta","thinking":"reasoning"}}),
json!({"type":"content_block_stop","index":0}),
]);
match evs.as_slice() {
[
ClaudeEvent::ThinkingStart { .. },
ClaudeEvent::ThinkingDelta { text, .. },
ClaudeEvent::ThinkingEnd { .. },
] => assert_eq!(text, "reasoning"),
other => panic!("unexpected events: {other:?}"),
}
}
#[test]
fn reused_block_index_across_messages_gets_distinct_ids() {
let evs = run(&[
json!({"type":"content_block_start","index":0,"content_block":{"type":"text"}}),
json!({"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"a"}}),
json!({"type":"content_block_stop","index":0}),
json!({"type":"content_block_start","index":0,"content_block":{"type":"text"}}),
json!({"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"b"}}),
json!({"type":"content_block_stop","index":0}),
]);
let ids: Vec<&str> = evs
.iter()
.filter_map(|e| match e {
ClaudeEvent::TextStart { block_id } => Some(block_id.as_str()),
_ => None,
})
.collect();
assert_eq!(ids.len(), 2);
assert_ne!(ids[0], ids[1], "reused index must not collide");
}
#[test]
fn mcp_config_file_has_http_server_with_auth_and_is_private() {
let cfg = McpConfig {
url: "http://127.0.0.1:8080/mcp/v1".into(),
auth_header: Some("Bearer secret-token".into()),
};
let path = write_mcp_config(&cfg).expect("write config");
let body = std::fs::read_to_string(&path).expect("read config");
let v: Value = serde_json::from_str(&body).unwrap();
let server = &v["mcpServers"]["kyma"];
assert_eq!(server["type"], "http");
assert_eq!(server["url"], "http://127.0.0.1:8080/mcp/v1");
assert_eq!(server["headers"]["Authorization"], "Bearer secret-token");
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
let mode = std::fs::metadata(&path).unwrap().permissions().mode();
assert_eq!(mode & 0o777, 0o600, "config file must be 0600");
}
let _ = std::fs::remove_file(&path);
}
#[test]
fn mcp_config_omits_auth_header_when_absent() {
let cfg = McpConfig {
url: "http://127.0.0.1:8080/mcp/v1".into(),
auth_header: None,
};
let path = write_mcp_config(&cfg).expect("write config");
let v: Value =
serde_json::from_str(&std::fs::read_to_string(&path).unwrap()).unwrap();
assert!(v["mcpServers"]["kyma"].get("headers").is_none());
let _ = std::fs::remove_file(&path);
}
#[test]
fn unmodeled_frames_do_not_deserialize_into_message() {
for frame in [
json!({"type":"rate_limit_event","rate_limit_info":{}}),
json!({"type":"totally_unknown"}),
] {
assert!(
serde_json::from_value::<Message>(frame.clone()).is_err(),
"expected {frame} to be unmodeled"
);
}
let ok = json!({"type":"assistant","message":{"content":[]},"session_id":"s"});
assert!(serde_json::from_value::<Message>(ok).is_ok());
}
}