Skip to main content

ralph_tui/
rpc_client.rs

1//! RPC v1 client for connecting the TUI to a remote ralph-api server.
2//!
3//! Provides HTTP request/response and WebSocket streaming for consuming
4//! the same RPC v1 API that the web dashboard uses. This enables the TUI
5//! to attach to a running orchestration loop from any terminal.
6
7use std::sync::atomic::{AtomicU64, Ordering};
8
9use anyhow::{Context, Result};
10use serde::{Deserialize, Serialize};
11use serde_json::{Value, json};
12
13// ---------------------------------------------------------------------------
14// Request / response types (mirrors ralph-api protocol)
15// ---------------------------------------------------------------------------
16
17static REQUEST_COUNTER: AtomicU64 = AtomicU64::new(0);
18
19fn next_request_id() -> String {
20    let n = REQUEST_COUNTER.fetch_add(1, Ordering::Relaxed);
21    format!("tui-{}-{:04x}", chrono::Utc::now().timestamp_millis(), n)
22}
23
24fn next_idempotency_key(method: &str) -> String {
25    let n = REQUEST_COUNTER.fetch_add(1, Ordering::Relaxed);
26    format!(
27        "idem-tui-{}-{}-{:04x}",
28        method.replace('.', "-"),
29        chrono::Utc::now().timestamp_millis(),
30        n
31    )
32}
33
34#[derive(Debug, Clone, Serialize)]
35#[serde(rename_all = "camelCase")]
36struct RpcRequest {
37    api_version: String,
38    id: String,
39    method: String,
40    params: Value,
41    #[serde(skip_serializing_if = "Option::is_none")]
42    meta: Option<RequestMeta>,
43}
44
45#[derive(Debug, Clone, Serialize)]
46#[serde(rename_all = "camelCase")]
47struct RequestMeta {
48    idempotency_key: String,
49    request_ts: String,
50}
51
52#[derive(Debug, Clone, Deserialize)]
53#[serde(rename_all = "camelCase")]
54struct RpcResponse {
55    #[allow(dead_code)]
56    api_version: String,
57    #[allow(dead_code)]
58    id: String,
59    result: Option<Value>,
60    error: Option<RpcErrorBody>,
61}
62
63#[derive(Debug, Clone, Deserialize)]
64pub struct RpcErrorBody {
65    pub code: String,
66    pub message: String,
67    pub retryable: bool,
68}
69
70// ---------------------------------------------------------------------------
71// Stream event types
72// ---------------------------------------------------------------------------
73
74/// A stream event received over WebSocket from ralph-api.
75#[derive(Debug, Clone, Deserialize)]
76#[serde(rename_all = "camelCase")]
77pub struct StreamEvent {
78    #[allow(dead_code)]
79    pub api_version: String,
80    #[allow(dead_code)]
81    pub stream: String,
82    pub topic: String,
83    pub cursor: String,
84    pub sequence: u64,
85    #[allow(dead_code)]
86    pub ts: String,
87    pub resource: StreamResource,
88    pub replay: StreamReplay,
89    pub payload: Value,
90}
91
92#[derive(Debug, Clone, Deserialize)]
93pub struct StreamResource {
94    #[serde(rename = "type")]
95    pub kind: String,
96    pub id: String,
97}
98
99#[derive(Debug, Clone, Deserialize)]
100#[serde(rename_all = "camelCase")]
101pub struct StreamReplay {
102    pub mode: String,
103    #[allow(dead_code)]
104    pub requested_cursor: Option<String>,
105    #[allow(dead_code)]
106    pub batch: Option<u64>,
107}
108
109// ---------------------------------------------------------------------------
110// Subscribe result
111// ---------------------------------------------------------------------------
112
113#[derive(Debug, Clone, Deserialize)]
114#[serde(rename_all = "camelCase")]
115pub struct SubscribeResult {
116    pub subscription_id: String,
117    pub accepted_topics: Vec<String>,
118    pub cursor: String,
119}
120
121// ---------------------------------------------------------------------------
122// Domain types returned by RPC methods
123// ---------------------------------------------------------------------------
124
125#[derive(Debug, Clone, Deserialize)]
126#[serde(rename_all = "camelCase")]
127pub struct TaskRecord {
128    pub id: String,
129    pub title: String,
130    pub status: String,
131}
132
133#[derive(Debug, Clone, Deserialize)]
134#[serde(rename_all = "camelCase")]
135pub struct LoopRecord {
136    pub id: String,
137    pub status: String,
138    #[serde(default)]
139    pub prompt: Option<String>,
140}
141
142#[derive(Debug, Clone, Deserialize)]
143#[serde(rename_all = "camelCase")]
144pub struct ConfigResult {
145    #[serde(default)]
146    pub config: Value,
147}
148
149// ---------------------------------------------------------------------------
150// RPC client
151// ---------------------------------------------------------------------------
152
153/// An RPC v1 client targeting a single ralph-api server.
154#[derive(Clone)]
155pub struct RpcClient {
156    http: reqwest::Client,
157    /// Base URL, e.g. `http://127.0.0.1:3000`
158    base_url: url::Url,
159}
160
161impl RpcClient {
162    /// Create a new client pointed at the given base URL.
163    pub fn new(base_url: &str) -> Result<Self> {
164        let base_url = url::Url::parse(base_url)
165            .with_context(|| format!("invalid ralph-api base URL: {base_url}"))?;
166        let http = reqwest::Client::builder()
167            .timeout(std::time::Duration::from_secs(30))
168            .build()
169            .context("failed to build HTTP client")?;
170        Ok(Self { http, base_url })
171    }
172
173    /// Issue an RPC call and return the `result` value.
174    pub async fn call(&self, method: &str, params: Value) -> Result<Value> {
175        let is_mutating = is_mutating(method);
176        let request = RpcRequest {
177            api_version: "v1".to_string(),
178            id: next_request_id(),
179            method: method.to_string(),
180            params,
181            meta: if is_mutating {
182                Some(RequestMeta {
183                    idempotency_key: next_idempotency_key(method),
184                    request_ts: chrono::Utc::now().to_rfc3339(),
185                })
186            } else {
187                None
188            },
189        };
190
191        let url = self
192            .base_url
193            .join("/rpc/v1")
194            .context("failed to build RPC endpoint URL")?;
195
196        let response = self
197            .http
198            .post(url)
199            .json(&request)
200            .send()
201            .await
202            .context("RPC HTTP request failed")?;
203
204        let status = response.status();
205        let body: RpcResponse = response
206            .json()
207            .await
208            .context("failed to parse RPC response JSON")?;
209
210        if let Some(err) = body.error {
211            anyhow::bail!(
212                "RPC error ({status}): [{code}] {msg}",
213                code = err.code,
214                msg = err.message
215            );
216        }
217
218        body.result
219            .ok_or_else(|| anyhow::anyhow!("RPC response missing result"))
220    }
221
222    // -- convenience wrappers ------------------------------------------------
223
224    /// Fetch all tasks.
225    pub async fn task_list(&self) -> Result<Vec<TaskRecord>> {
226        let result = self.call("task.list", json!({})).await?;
227        let tasks: Vec<TaskRecord> =
228            serde_json::from_value(result.get("tasks").cloned().unwrap_or(Value::Array(vec![])))
229                .context("failed to parse task list")?;
230        Ok(tasks)
231    }
232
233    /// Fetch all loops.
234    pub async fn loop_list(&self) -> Result<Vec<LoopRecord>> {
235        let result = self
236            .call("loop.list", json!({ "includeTerminal": true }))
237            .await?;
238        let loops: Vec<LoopRecord> =
239            serde_json::from_value(result.get("loops").cloned().unwrap_or(Value::Array(vec![])))
240                .context("failed to parse loop list")?;
241        Ok(loops)
242    }
243
244    /// Fetch config.
245    pub async fn config_get(&self) -> Result<Value> {
246        self.call("config.get", json!({})).await
247    }
248
249    /// Create a stream subscription, returning the subscription ID and cursor.
250    pub async fn stream_subscribe(
251        &self,
252        topics: &[&str],
253        cursor: Option<&str>,
254    ) -> Result<SubscribeResult> {
255        let mut params = json!({
256            "topics": topics,
257        });
258        if let Some(c) = cursor {
259            params["cursor"] = Value::String(c.to_string());
260        }
261        let result = self.call("stream.subscribe", params).await?;
262        serde_json::from_value(result).context("failed to parse subscribe result")
263    }
264
265    /// Build the WebSocket URL for the given subscription ID.
266    pub fn stream_ws_url(&self, subscription_id: &str) -> Result<String> {
267        let mut ws_url = self.base_url.clone();
268        let scheme = match ws_url.scheme() {
269            "https" => "wss",
270            _ => "ws",
271        };
272        ws_url
273            .set_scheme(scheme)
274            .map_err(|()| anyhow::anyhow!("failed to set WebSocket scheme"))?;
275        ws_url.set_path("/rpc/v1/stream");
276        ws_url
277            .query_pairs_mut()
278            .append_pair("subscriptionId", subscription_id);
279        Ok(ws_url.to_string())
280    }
281
282    /// Send a `stream.ack` to checkpoint the cursor.
283    pub async fn stream_ack(&self, subscription_id: &str, cursor: &str) -> Result<()> {
284        self.call(
285            "stream.ack",
286            json!({
287                "subscriptionId": subscription_id,
288                "cursor": cursor,
289            }),
290        )
291        .await?;
292        Ok(())
293    }
294}
295
296fn is_mutating(method: &str) -> bool {
297    matches!(
298        method,
299        "task.create"
300            | "task.update"
301            | "task.close"
302            | "task.archive"
303            | "task.unarchive"
304            | "task.delete"
305            | "task.clear"
306            | "task.run"
307            | "task.run_all"
308            | "task.retry"
309            | "task.cancel"
310            | "loop.process"
311            | "loop.prune"
312            | "loop.retry"
313            | "loop.discard"
314            | "loop.stop"
315            | "loop.merge"
316            | "loop.trigger_merge_task"
317            | "planning.start"
318            | "planning.respond"
319            | "planning.resume"
320            | "planning.delete"
321            | "config.update"
322            | "collection.create"
323            | "collection.update"
324            | "collection.delete"
325            | "collection.import"
326    )
327}