1use std::sync::atomic::{AtomicU64, Ordering};
8
9use anyhow::{Context, Result};
10use serde::{Deserialize, Serialize};
11use serde_json::{Value, json};
12
13static REQUEST_COUNTER: AtomicU64 = AtomicU64::new(0);
18
19fn next_request_id() -> String {
20 let n = REQUEST_COUNTER.fetch_add(1, Ordering::Relaxed);
21 format!("tui-{}-{:04x}", chrono::Utc::now().timestamp_millis(), n)
22}
23
24fn next_idempotency_key(method: &str) -> String {
25 let n = REQUEST_COUNTER.fetch_add(1, Ordering::Relaxed);
26 format!(
27 "idem-tui-{}-{}-{:04x}",
28 method.replace('.', "-"),
29 chrono::Utc::now().timestamp_millis(),
30 n
31 )
32}
33
34#[derive(Debug, Clone, Serialize)]
35#[serde(rename_all = "camelCase")]
36struct RpcRequest {
37 api_version: String,
38 id: String,
39 method: String,
40 params: Value,
41 #[serde(skip_serializing_if = "Option::is_none")]
42 meta: Option<RequestMeta>,
43}
44
45#[derive(Debug, Clone, Serialize)]
46#[serde(rename_all = "camelCase")]
47struct RequestMeta {
48 idempotency_key: String,
49 request_ts: String,
50}
51
52#[derive(Debug, Clone, Deserialize)]
53#[serde(rename_all = "camelCase")]
54struct RpcResponse {
55 #[allow(dead_code)]
56 api_version: String,
57 #[allow(dead_code)]
58 id: String,
59 result: Option<Value>,
60 error: Option<RpcErrorBody>,
61}
62
63#[derive(Debug, Clone, Deserialize)]
64pub struct RpcErrorBody {
65 pub code: String,
66 pub message: String,
67 pub retryable: bool,
68}
69
70#[derive(Debug, Clone, Deserialize)]
76#[serde(rename_all = "camelCase")]
77pub struct StreamEvent {
78 #[allow(dead_code)]
79 pub api_version: String,
80 #[allow(dead_code)]
81 pub stream: String,
82 pub topic: String,
83 pub cursor: String,
84 pub sequence: u64,
85 #[allow(dead_code)]
86 pub ts: String,
87 pub resource: StreamResource,
88 pub replay: StreamReplay,
89 pub payload: Value,
90}
91
92#[derive(Debug, Clone, Deserialize)]
93pub struct StreamResource {
94 #[serde(rename = "type")]
95 pub kind: String,
96 pub id: String,
97}
98
99#[derive(Debug, Clone, Deserialize)]
100#[serde(rename_all = "camelCase")]
101pub struct StreamReplay {
102 pub mode: String,
103 #[allow(dead_code)]
104 pub requested_cursor: Option<String>,
105 #[allow(dead_code)]
106 pub batch: Option<u64>,
107}
108
109#[derive(Debug, Clone, Deserialize)]
114#[serde(rename_all = "camelCase")]
115pub struct SubscribeResult {
116 pub subscription_id: String,
117 pub accepted_topics: Vec<String>,
118 pub cursor: String,
119}
120
121#[derive(Debug, Clone, Deserialize)]
126#[serde(rename_all = "camelCase")]
127pub struct TaskRecord {
128 pub id: String,
129 pub title: String,
130 pub status: String,
131}
132
133#[derive(Debug, Clone, Deserialize)]
134#[serde(rename_all = "camelCase")]
135pub struct LoopRecord {
136 pub id: String,
137 pub status: String,
138 #[serde(default)]
139 pub prompt: Option<String>,
140}
141
142#[derive(Debug, Clone, Deserialize)]
143#[serde(rename_all = "camelCase")]
144pub struct ConfigResult {
145 #[serde(default)]
146 pub config: Value,
147}
148
149#[derive(Clone)]
155pub struct RpcClient {
156 http: reqwest::Client,
157 base_url: url::Url,
159}
160
161impl RpcClient {
162 pub fn new(base_url: &str) -> Result<Self> {
164 let base_url = url::Url::parse(base_url)
165 .with_context(|| format!("invalid ralph-api base URL: {base_url}"))?;
166 let http = reqwest::Client::builder()
167 .timeout(std::time::Duration::from_secs(30))
168 .build()
169 .context("failed to build HTTP client")?;
170 Ok(Self { http, base_url })
171 }
172
173 pub async fn call(&self, method: &str, params: Value) -> Result<Value> {
175 let is_mutating = is_mutating(method);
176 let request = RpcRequest {
177 api_version: "v1".to_string(),
178 id: next_request_id(),
179 method: method.to_string(),
180 params,
181 meta: if is_mutating {
182 Some(RequestMeta {
183 idempotency_key: next_idempotency_key(method),
184 request_ts: chrono::Utc::now().to_rfc3339(),
185 })
186 } else {
187 None
188 },
189 };
190
191 let url = self
192 .base_url
193 .join("/rpc/v1")
194 .context("failed to build RPC endpoint URL")?;
195
196 let response = self
197 .http
198 .post(url)
199 .json(&request)
200 .send()
201 .await
202 .context("RPC HTTP request failed")?;
203
204 let status = response.status();
205 let body: RpcResponse = response
206 .json()
207 .await
208 .context("failed to parse RPC response JSON")?;
209
210 if let Some(err) = body.error {
211 anyhow::bail!(
212 "RPC error ({status}): [{code}] {msg}",
213 code = err.code,
214 msg = err.message
215 );
216 }
217
218 body.result
219 .ok_or_else(|| anyhow::anyhow!("RPC response missing result"))
220 }
221
222 pub async fn task_list(&self) -> Result<Vec<TaskRecord>> {
226 let result = self.call("task.list", json!({})).await?;
227 let tasks: Vec<TaskRecord> =
228 serde_json::from_value(result.get("tasks").cloned().unwrap_or(Value::Array(vec![])))
229 .context("failed to parse task list")?;
230 Ok(tasks)
231 }
232
233 pub async fn loop_list(&self) -> Result<Vec<LoopRecord>> {
235 let result = self
236 .call("loop.list", json!({ "includeTerminal": true }))
237 .await?;
238 let loops: Vec<LoopRecord> =
239 serde_json::from_value(result.get("loops").cloned().unwrap_or(Value::Array(vec![])))
240 .context("failed to parse loop list")?;
241 Ok(loops)
242 }
243
244 pub async fn config_get(&self) -> Result<Value> {
246 self.call("config.get", json!({})).await
247 }
248
249 pub async fn stream_subscribe(
251 &self,
252 topics: &[&str],
253 cursor: Option<&str>,
254 ) -> Result<SubscribeResult> {
255 let mut params = json!({
256 "topics": topics,
257 });
258 if let Some(c) = cursor {
259 params["cursor"] = Value::String(c.to_string());
260 }
261 let result = self.call("stream.subscribe", params).await?;
262 serde_json::from_value(result).context("failed to parse subscribe result")
263 }
264
265 pub fn stream_ws_url(&self, subscription_id: &str) -> Result<String> {
267 let mut ws_url = self.base_url.clone();
268 let scheme = match ws_url.scheme() {
269 "https" => "wss",
270 _ => "ws",
271 };
272 ws_url
273 .set_scheme(scheme)
274 .map_err(|()| anyhow::anyhow!("failed to set WebSocket scheme"))?;
275 ws_url.set_path("/rpc/v1/stream");
276 ws_url
277 .query_pairs_mut()
278 .append_pair("subscriptionId", subscription_id);
279 Ok(ws_url.to_string())
280 }
281
282 pub async fn stream_ack(&self, subscription_id: &str, cursor: &str) -> Result<()> {
284 self.call(
285 "stream.ack",
286 json!({
287 "subscriptionId": subscription_id,
288 "cursor": cursor,
289 }),
290 )
291 .await?;
292 Ok(())
293 }
294}
295
296fn is_mutating(method: &str) -> bool {
297 matches!(
298 method,
299 "task.create"
300 | "task.update"
301 | "task.close"
302 | "task.archive"
303 | "task.unarchive"
304 | "task.delete"
305 | "task.clear"
306 | "task.run"
307 | "task.run_all"
308 | "task.retry"
309 | "task.cancel"
310 | "loop.process"
311 | "loop.prune"
312 | "loop.retry"
313 | "loop.discard"
314 | "loop.stop"
315 | "loop.merge"
316 | "loop.trigger_merge_task"
317 | "planning.start"
318 | "planning.respond"
319 | "planning.resume"
320 | "planning.delete"
321 | "config.update"
322 | "collection.create"
323 | "collection.update"
324 | "collection.delete"
325 | "collection.import"
326 )
327}