use std::io::{BufRead, Write};
use std::sync::Arc;
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use relayburn_sdk::{
Ledger, LedgerHandle, LedgerOpenOptions, SessionCostOptions, SessionCostResult,
};
use crate::cli::{GlobalArgs, McpServerArgs};
use crate::render::error::report_error;
const PROTOCOL_VERSION: &str = "2025-03-26";
const SERVER_NAME: &str = "relayburn-mcp";
const SERVER_VERSION: &str = env!("CARGO_PKG_VERSION");
pub fn run(globals: &GlobalArgs, args: McpServerArgs) -> i32 {
let handle = match open_handle(globals) {
Ok(h) => h,
Err(err) => return report_error(&err, globals),
};
let rt = match tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
{
Ok(rt) => rt,
Err(err) => return report_error(&err, globals),
};
let server = Server {
handle: Arc::new(tokio::sync::Mutex::new(handle)),
default_session_id: args.session_id.clone(),
debug: args.debug,
};
rt.block_on(server.run());
0
}
fn open_handle(globals: &GlobalArgs) -> anyhow::Result<LedgerHandle> {
let opts = match globals.ledger_path.as_deref() {
Some(h) => LedgerOpenOptions::with_home(h),
None => LedgerOpenOptions::default(),
};
Ledger::open(opts)
}
#[derive(Debug, Deserialize)]
struct JsonRpcRequest {
#[serde(default)]
id: Option<Value>,
method: String,
#[serde(default)]
params: Value,
}
#[derive(Debug, Serialize)]
struct JsonRpcSuccess<'a> {
jsonrpc: &'static str,
id: &'a Value,
result: Value,
}
#[derive(Debug, Serialize)]
struct JsonRpcError<'a> {
jsonrpc: &'static str,
id: &'a Value,
error: JsonRpcErrorBody,
}
#[derive(Debug, Serialize)]
struct JsonRpcErrorBody {
code: i32,
message: String,
#[serde(skip_serializing_if = "Option::is_none")]
data: Option<Value>,
}
struct Server {
handle: Arc<tokio::sync::Mutex<LedgerHandle>>,
default_session_id: Option<String>,
debug: bool,
}
impl Server {
async fn run(self) {
let (tx, mut rx) = tokio::sync::mpsc::channel::<String>(64);
let stdin_thread = std::thread::spawn(move || {
let stdin = std::io::stdin();
let lock = stdin.lock();
for line in lock.lines() {
let line = match line {
Ok(l) => l,
Err(_) => break,
};
if line.trim().is_empty() {
continue;
}
if tx.blocking_send(line).is_err() {
break;
}
}
});
while let Some(frame) = rx.recv().await {
self.handle_frame(&frame).await;
}
let _ = stdin_thread.join();
}
async fn handle_frame(&self, frame: &str) {
let parsed: serde_json::Result<Value> = serde_json::from_str(frame);
let value = match parsed {
Ok(v) => v,
Err(err) => {
if self.debug {
eprintln!("[burn mcp] parse error: {err}");
}
write_response(&error_envelope(&Value::Null, -32700, "parse error", None));
return;
}
};
if !value.is_object() {
write_response(&error_envelope(&Value::Null, -32600, "invalid request", None));
return;
}
let has_id = value.get("id").is_some();
if !has_id {
return;
}
let req: JsonRpcRequest = match serde_json::from_value(value.clone()) {
Ok(r) => r,
Err(err) => {
if self.debug {
eprintln!("[burn mcp] bad request shape: {err}");
}
let id = value.get("id").cloned().unwrap_or(Value::Null);
write_response(&error_envelope(&id, -32600, "invalid request", None));
return;
}
};
let id = req.id.unwrap_or(Value::Null);
match req.method.as_str() {
"initialize" => self.handle_initialize(&id, &req.params),
"ping" => write_success(&id, json!({})),
"tools/list" => self.handle_tools_list(&id),
"tools/call" => self.handle_tools_call(&id, &req.params).await,
other => {
write_response(&error_envelope(
&id,
-32601,
&format!("method not found: {other}"),
None,
));
}
}
}
fn handle_initialize(&self, id: &Value, params: &Value) {
let client_version = params
.get("protocolVersion")
.and_then(|v| v.as_str())
.map(|s| s.to_string());
let protocol_version = client_version.unwrap_or_else(|| PROTOCOL_VERSION.to_string());
let result = json!({
"protocolVersion": protocol_version,
"capabilities": { "tools": { "listChanged": false } },
"serverInfo": { "name": SERVER_NAME, "version": SERVER_VERSION },
});
write_success(id, result);
}
fn handle_tools_list(&self, id: &Value) {
let tools = json!([
{
"name": "burn__sessionCost",
"description":
"Return the total cost (USD), token count, and turn count for a session. \
Defaults to the server's registered sessionId (the running agent's own \
session). Read-only.",
"inputSchema": {
"type": "object",
"properties": {
"sessionId": {
"type": "string",
"description":
"Override the registered session id. Omit to query the running \
agent's own session.",
},
},
"required": [],
"additionalProperties": false,
},
}
]);
write_success(id, json!({ "tools": tools }));
}
async fn handle_tools_call(&self, id: &Value, params: &Value) {
let name = params.get("name").and_then(|v| v.as_str());
let Some(name) = name else {
write_response(&error_envelope(
id,
-32602,
"tools/call requires a name",
None,
));
return;
};
let args = params.get("arguments").cloned().unwrap_or(json!({}));
match name {
"burn__sessionCost" => self.tool_session_cost(id, &args).await,
other => {
write_response(&error_envelope(
id,
-32601,
&format!("unknown tool: {other}"),
None,
));
}
}
}
async fn tool_session_cost(&self, id: &Value, args: &Value) {
let override_id = args
.get("sessionId")
.and_then(|v| v.as_str())
.map(|s| s.to_string());
let session = override_id
.clone()
.or_else(|| self.default_session_id.clone());
let opts = SessionCostOptions {
session: session.clone(),
ledger_home: None,
};
let handle_guard = self.handle.lock().await;
let result = handle_guard.session_cost(opts);
drop(handle_guard);
let mut payload: SessionCostResult = match result {
Ok(r) => r,
Err(err) => {
let msg = err.to_string();
write_success(
id,
json!({
"content": [{ "type": "text", "text": msg }],
"isError": true,
}),
);
return;
}
};
if payload.session_id.is_none() && override_id.is_none() && self.default_session_id.is_none()
{
payload.note = Some(
"no session id provided and server was not registered with one".to_string(),
);
}
let value = serde_json::to_value(&payload).unwrap_or(Value::Null);
let text = serde_json::to_string(&value).unwrap_or_else(|_| "{}".to_string());
write_success(
id,
json!({
"content": [{ "type": "text", "text": text }],
"structuredContent": value,
}),
);
}
}
fn write_success(id: &Value, result: Value) {
let env = JsonRpcSuccess {
jsonrpc: "2.0",
id,
result,
};
write_response(&serde_json::to_value(&env).unwrap_or(Value::Null));
}
fn error_envelope(id: &Value, code: i32, message: &str, data: Option<Value>) -> Value {
let env = JsonRpcError {
jsonrpc: "2.0",
id,
error: JsonRpcErrorBody {
code,
message: message.to_string(),
data,
},
};
serde_json::to_value(&env).unwrap_or(Value::Null)
}
fn write_response(value: &Value) {
let stdout = std::io::stdout();
let mut out = stdout.lock();
if let Ok(mut s) = serde_json::to_string(value) {
s.push('\n');
let _ = out.write_all(s.as_bytes());
let _ = out.flush();
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn error_envelope_carries_code_and_message() {
let v = error_envelope(&json!(7), -32601, "method not found: foo", None);
assert_eq!(v.get("jsonrpc"), Some(&Value::String("2.0".into())));
assert_eq!(v.get("id"), Some(&json!(7)));
let err = v.get("error").unwrap();
assert_eq!(err.get("code"), Some(&json!(-32601)));
assert_eq!(
err.get("message"),
Some(&Value::String("method not found: foo".into())),
);
}
}