use std::sync::atomic::{AtomicU64, Ordering};
use anyhow::{Context, Result};
use serde::{Deserialize, Serialize};
use serde_json::{Value, json};
static REQUEST_COUNTER: AtomicU64 = AtomicU64::new(0);
fn next_request_id() -> String {
let n = REQUEST_COUNTER.fetch_add(1, Ordering::Relaxed);
format!("tui-{}-{:04x}", chrono::Utc::now().timestamp_millis(), n)
}
fn next_idempotency_key(method: &str) -> String {
let n = REQUEST_COUNTER.fetch_add(1, Ordering::Relaxed);
format!(
"idem-tui-{}-{}-{:04x}",
method.replace('.', "-"),
chrono::Utc::now().timestamp_millis(),
n
)
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
struct RpcRequest {
api_version: String,
id: String,
method: String,
params: Value,
#[serde(skip_serializing_if = "Option::is_none")]
meta: Option<RequestMeta>,
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
struct RequestMeta {
idempotency_key: String,
request_ts: String,
}
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "camelCase")]
struct RpcResponse {
#[allow(dead_code)]
api_version: String,
#[allow(dead_code)]
id: String,
result: Option<Value>,
error: Option<RpcErrorBody>,
}
#[derive(Debug, Clone, Deserialize)]
pub struct RpcErrorBody {
pub code: String,
pub message: String,
pub retryable: bool,
}
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct StreamEvent {
#[allow(dead_code)]
pub api_version: String,
#[allow(dead_code)]
pub stream: String,
pub topic: String,
pub cursor: String,
pub sequence: u64,
#[allow(dead_code)]
pub ts: String,
pub resource: StreamResource,
pub replay: StreamReplay,
pub payload: Value,
}
#[derive(Debug, Clone, Deserialize)]
pub struct StreamResource {
#[serde(rename = "type")]
pub kind: String,
pub id: String,
}
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct StreamReplay {
pub mode: String,
#[allow(dead_code)]
pub requested_cursor: Option<String>,
#[allow(dead_code)]
pub batch: Option<u64>,
}
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct SubscribeResult {
pub subscription_id: String,
pub accepted_topics: Vec<String>,
pub cursor: String,
}
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct TaskRecord {
pub id: String,
pub title: String,
pub status: String,
}
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct LoopRecord {
pub id: String,
pub status: String,
#[serde(default)]
pub prompt: Option<String>,
}
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ConfigResult {
#[serde(default)]
pub config: Value,
}
#[derive(Clone)]
pub struct RpcClient {
http: reqwest::Client,
base_url: url::Url,
}
impl RpcClient {
pub fn new(base_url: &str) -> Result<Self> {
let base_url = url::Url::parse(base_url)
.with_context(|| format!("invalid ralph-api base URL: {base_url}"))?;
let http = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(30))
.build()
.context("failed to build HTTP client")?;
Ok(Self { http, base_url })
}
pub async fn call(&self, method: &str, params: Value) -> Result<Value> {
let is_mutating = is_mutating(method);
let request = RpcRequest {
api_version: "v1".to_string(),
id: next_request_id(),
method: method.to_string(),
params,
meta: if is_mutating {
Some(RequestMeta {
idempotency_key: next_idempotency_key(method),
request_ts: chrono::Utc::now().to_rfc3339(),
})
} else {
None
},
};
let url = self
.base_url
.join("/rpc/v1")
.context("failed to build RPC endpoint URL")?;
let response = self
.http
.post(url)
.json(&request)
.send()
.await
.context("RPC HTTP request failed")?;
let status = response.status();
let body: RpcResponse = response
.json()
.await
.context("failed to parse RPC response JSON")?;
if let Some(err) = body.error {
anyhow::bail!(
"RPC error ({status}): [{code}] {msg}",
code = err.code,
msg = err.message
);
}
body.result
.ok_or_else(|| anyhow::anyhow!("RPC response missing result"))
}
pub async fn task_list(&self) -> Result<Vec<TaskRecord>> {
let result = self.call("task.list", json!({})).await?;
let tasks: Vec<TaskRecord> =
serde_json::from_value(result.get("tasks").cloned().unwrap_or(Value::Array(vec![])))
.context("failed to parse task list")?;
Ok(tasks)
}
pub async fn loop_list(&self) -> Result<Vec<LoopRecord>> {
let result = self
.call("loop.list", json!({ "includeTerminal": true }))
.await?;
let loops: Vec<LoopRecord> =
serde_json::from_value(result.get("loops").cloned().unwrap_or(Value::Array(vec![])))
.context("failed to parse loop list")?;
Ok(loops)
}
pub async fn config_get(&self) -> Result<Value> {
self.call("config.get", json!({})).await
}
pub async fn stream_subscribe(
&self,
topics: &[&str],
cursor: Option<&str>,
) -> Result<SubscribeResult> {
let mut params = json!({
"topics": topics,
});
if let Some(c) = cursor {
params["cursor"] = Value::String(c.to_string());
}
let result = self.call("stream.subscribe", params).await?;
serde_json::from_value(result).context("failed to parse subscribe result")
}
pub fn stream_ws_url(&self, subscription_id: &str) -> Result<String> {
let mut ws_url = self.base_url.clone();
let scheme = match ws_url.scheme() {
"https" => "wss",
_ => "ws",
};
ws_url
.set_scheme(scheme)
.map_err(|()| anyhow::anyhow!("failed to set WebSocket scheme"))?;
ws_url.set_path("/rpc/v1/stream");
ws_url
.query_pairs_mut()
.append_pair("subscriptionId", subscription_id);
Ok(ws_url.to_string())
}
pub async fn stream_ack(&self, subscription_id: &str, cursor: &str) -> Result<()> {
self.call(
"stream.ack",
json!({
"subscriptionId": subscription_id,
"cursor": cursor,
}),
)
.await?;
Ok(())
}
}
fn is_mutating(method: &str) -> bool {
matches!(
method,
"task.create"
| "task.update"
| "task.close"
| "task.archive"
| "task.unarchive"
| "task.delete"
| "task.clear"
| "task.run"
| "task.run_all"
| "task.retry"
| "task.cancel"
| "loop.process"
| "loop.prune"
| "loop.retry"
| "loop.discard"
| "loop.stop"
| "loop.merge"
| "loop.trigger_merge_task"
| "planning.start"
| "planning.respond"
| "planning.resume"
| "planning.delete"
| "config.update"
| "collection.create"
| "collection.update"
| "collection.delete"
| "collection.import"
)
}