Skip to main content

ralph_api/
protocol.rs

1use std::sync::OnceLock;
2
3use jsonschema::{Draft, JSONSchema};
4use serde::{Deserialize, Serialize};
5use serde_json::{Value, json};
6
7use crate::errors::ApiError;
8use crate::loop_support::now_ts;
9
10pub const API_VERSION: &str = "v1";
11pub const STREAM_NAME: &str = "events.v1";
12
13pub const KNOWN_METHODS: &[&str] = &[
14    "system.health",
15    "system.version",
16    "system.capabilities",
17    "task.list",
18    "task.get",
19    "task.ready",
20    "task.create",
21    "task.update",
22    "task.close",
23    "task.archive",
24    "task.unarchive",
25    "task.delete",
26    "task.clear",
27    "task.run",
28    "task.run_all",
29    "task.retry",
30    "task.cancel",
31    "task.status",
32    "loop.list",
33    "loop.status",
34    "loop.process",
35    "loop.prune",
36    "loop.retry",
37    "loop.discard",
38    "loop.stop",
39    "loop.merge",
40    "loop.merge_button_state",
41    "loop.trigger_merge_task",
42    "planning.list",
43    "planning.get",
44    "planning.start",
45    "planning.respond",
46    "planning.resume",
47    "planning.delete",
48    "planning.get_artifact",
49    "config.get",
50    "config.update",
51    "preset.list",
52    "collection.list",
53    "collection.get",
54    "collection.create",
55    "collection.update",
56    "collection.delete",
57    "collection.import",
58    "collection.export",
59    "stream.subscribe",
60    "stream.unsubscribe",
61    "stream.ack",
62];
63
64pub const MUTATING_METHODS: &[&str] = &[
65    "task.create",
66    "task.update",
67    "task.close",
68    "task.archive",
69    "task.unarchive",
70    "task.delete",
71    "task.clear",
72    "task.run",
73    "task.run_all",
74    "task.retry",
75    "task.cancel",
76    "loop.process",
77    "loop.prune",
78    "loop.retry",
79    "loop.discard",
80    "loop.stop",
81    "loop.merge",
82    "loop.trigger_merge_task",
83    "planning.start",
84    "planning.respond",
85    "planning.resume",
86    "planning.delete",
87    "config.update",
88    "collection.create",
89    "collection.update",
90    "collection.delete",
91    "collection.import",
92];
93
94pub const STREAM_TOPICS: &[&str] = &[
95    "system.heartbeat",
96    "system.lifecycle",
97    "task.log.line",
98    "task.status.changed",
99    "loop.status.changed",
100    "loop.merge.progress",
101    "planning.prompt.issued",
102    "planning.response.recorded",
103    "planning.artifact.updated",
104    "config.updated",
105    "collection.updated",
106    "preset.refreshed",
107    "error.raised",
108    "stream.keepalive",
109];
110
111#[derive(Debug, Clone, Deserialize)]
112#[serde(rename_all = "camelCase")]
113pub struct RpcRequestEnvelope {
114    pub api_version: String,
115    pub id: String,
116    pub method: String,
117    pub params: Value,
118    pub meta: Option<RequestMeta>,
119}
120
121#[derive(Debug, Clone, Deserialize)]
122#[serde(rename_all = "camelCase")]
123pub struct RequestMeta {
124    pub idempotency_key: Option<String>,
125    pub auth: Option<AuthMeta>,
126    pub timeout_ms: Option<u64>,
127    pub request_ts: Option<String>,
128}
129
130#[derive(Debug, Clone, Deserialize)]
131pub struct AuthMeta {
132    pub mode: String,
133    pub token: Option<String>,
134}
135
136#[derive(Debug, Clone, Serialize)]
137#[serde(rename_all = "camelCase")]
138struct ResponseMeta {
139    served_by: String,
140    served_at: String,
141}
142
143#[derive(Debug, Clone, Serialize)]
144#[serde(rename_all = "camelCase")]
145struct SuccessEnvelope {
146    api_version: String,
147    id: String,
148    method: String,
149    result: Value,
150    meta: ResponseMeta,
151}
152
153#[derive(Debug, Clone, Serialize)]
154#[serde(rename_all = "camelCase")]
155struct ErrorEnvelope {
156    api_version: String,
157    id: String,
158    #[serde(skip_serializing_if = "Option::is_none")]
159    method: Option<String>,
160    error: crate::errors::RpcErrorBody,
161    meta: ResponseMeta,
162}
163
164pub fn is_known_method(method: &str) -> bool {
165    KNOWN_METHODS.contains(&method)
166}
167
168pub fn is_mutating_method(method: &str) -> bool {
169    MUTATING_METHODS.contains(&method)
170}
171
172pub fn parse_json_value(body: &[u8]) -> Result<Value, ApiError> {
173    serde_json::from_slice::<Value>(body)
174        .map_err(|err| ApiError::invalid_request(format!("invalid JSON body: {err}")))
175}
176
177pub fn request_context(raw: &Value) -> (String, Option<String>) {
178    let request_id = raw
179        .get("id")
180        .and_then(Value::as_str)
181        .filter(|value| !value.is_empty())
182        .unwrap_or("unknown")
183        .to_string();
184    let method = raw
185        .get("method")
186        .and_then(Value::as_str)
187        .map(std::string::ToString::to_string);
188    (request_id, method)
189}
190
191pub fn parse_request(raw: &Value) -> Result<RpcRequestEnvelope, ApiError> {
192    serde_json::from_value::<RpcRequestEnvelope>(raw.clone())
193        .map_err(|err| ApiError::invalid_request(format!("invalid request envelope: {err}")))
194}
195
196pub fn validate_request_schema(raw: &Value) -> Result<(), Vec<String>> {
197    let validator = request_schema_validator();
198    match validator.validate(raw) {
199        Ok(()) => Ok(()),
200        Err(errors) => Err(errors.map(|error| error.to_string()).collect()),
201    }
202}
203
204pub fn success_envelope(request: &RpcRequestEnvelope, result: Value, served_by: &str) -> Value {
205    serde_json::to_value(SuccessEnvelope {
206        api_version: API_VERSION.to_string(),
207        id: request.id.clone(),
208        method: request.method.clone(),
209        result,
210        meta: response_meta(served_by),
211    })
212    .expect("success envelope should always serialize")
213}
214
215pub fn error_envelope(error: &ApiError, served_by: &str) -> Value {
216    serde_json::to_value(ErrorEnvelope {
217        api_version: API_VERSION.to_string(),
218        id: error.request_id.clone(),
219        method: error.method.clone(),
220        error: error.as_body(),
221        meta: response_meta(served_by),
222    })
223    .expect("error envelope should always serialize")
224}
225
226fn response_meta(served_by: &str) -> ResponseMeta {
227    ResponseMeta {
228        served_by: served_by.to_string(),
229        served_at: now_ts(),
230    }
231}
232
233fn request_schema_validator() -> &'static JSONSchema {
234    static REQUEST_VALIDATOR: OnceLock<JSONSchema> = OnceLock::new();
235    REQUEST_VALIDATOR.get_or_init(|| {
236        let raw_schema = include_str!("../data/rpc-v1-schema.json");
237        let root_schema: Value =
238            serde_json::from_str(raw_schema).expect("embedded rpc-v1 schema must be valid JSON");
239        let defs = root_schema
240            .get("$defs")
241            .cloned()
242            .expect("rpc-v1 schema must include $defs");
243
244        let request_schema = json!({
245            "$schema": "https://json-schema.org/draft/2020-12/schema",
246            "$defs": defs,
247            "$ref": "#/$defs/requestEnvelope"
248        });
249
250        JSONSchema::options()
251            .with_draft(Draft::Draft7)
252            .compile(&request_schema)
253            .expect("request envelope schema must compile")
254    })
255}