Skip to main content

ralph_api/
protocol.rs

1use std::sync::OnceLock;
2
3use jsonschema::{Draft, JSONSchema};
4use serde::{Deserialize, Serialize};
5use serde_json::{Value, json};
6
7use crate::errors::ApiError;
8use crate::loop_support::now_ts;
9
10pub const API_VERSION: &str = "v1";
11pub const STREAM_NAME: &str = "events.v1";
12
13pub const KNOWN_METHODS: &[&str] = &[
14    "system.health",
15    "system.version",
16    "system.capabilities",
17    "task.list",
18    "task.get",
19    "task.ready",
20    "task.create",
21    "task.update",
22    "task.close",
23    "task.archive",
24    "task.unarchive",
25    "task.delete",
26    "task.clear",
27    "task.run",
28    "task.run_all",
29    "task.retry",
30    "task.cancel",
31    "task.status",
32    "loop.list",
33    "loop.status",
34    "loop.process",
35    "loop.prune",
36    "loop.retry",
37    "loop.discard",
38    "loop.stop",
39    "loop.merge",
40    "loop.merge_button_state",
41    "loop.trigger_merge_task",
42    "planning.list",
43    "planning.get",
44    "planning.start",
45    "planning.respond",
46    "planning.resume",
47    "planning.delete",
48    "planning.get_artifact",
49    "config.get",
50    "config.update",
51    "preset.list",
52    "collection.list",
53    "collection.get",
54    "collection.create",
55    "collection.update",
56    "collection.delete",
57    "collection.import",
58    "collection.export",
59    "collection.run",
60    "stream.subscribe",
61    "stream.unsubscribe",
62    "stream.ack",
63];
64
65pub const MUTATING_METHODS: &[&str] = &[
66    "task.create",
67    "task.update",
68    "task.close",
69    "task.archive",
70    "task.unarchive",
71    "task.delete",
72    "task.clear",
73    "task.run",
74    "task.run_all",
75    "task.retry",
76    "task.cancel",
77    "loop.process",
78    "loop.prune",
79    "loop.retry",
80    "loop.discard",
81    "loop.stop",
82    "loop.merge",
83    "loop.trigger_merge_task",
84    "planning.start",
85    "planning.respond",
86    "planning.resume",
87    "planning.delete",
88    "config.update",
89    "collection.create",
90    "collection.update",
91    "collection.delete",
92    "collection.import",
93    "collection.run",
94];
95
96pub const STREAM_TOPICS: &[&str] = &[
97    "system.heartbeat",
98    "system.lifecycle",
99    "task.log.line",
100    "task.status.changed",
101    "loop.status.changed",
102    "loop.merge.progress",
103    "loop.orchestration",
104    "planning.prompt.issued",
105    "planning.response.recorded",
106    "planning.artifact.updated",
107    "config.updated",
108    "collection.updated",
109    "preset.refreshed",
110    "error.raised",
111    "stream.keepalive",
112];
113
114#[derive(Debug, Clone, Deserialize)]
115#[serde(rename_all = "camelCase")]
116pub struct RpcRequestEnvelope {
117    pub api_version: String,
118    pub id: String,
119    pub method: String,
120    pub params: Value,
121    pub meta: Option<RequestMeta>,
122}
123
124#[derive(Debug, Clone, Deserialize)]
125#[serde(rename_all = "camelCase")]
126pub struct RequestMeta {
127    pub idempotency_key: Option<String>,
128    pub auth: Option<AuthMeta>,
129    pub timeout_ms: Option<u64>,
130    pub request_ts: Option<String>,
131}
132
133#[derive(Debug, Clone, Deserialize)]
134pub struct AuthMeta {
135    pub mode: String,
136    pub token: Option<String>,
137}
138
139#[derive(Debug, Clone, Serialize)]
140#[serde(rename_all = "camelCase")]
141struct ResponseMeta {
142    served_by: String,
143    served_at: String,
144}
145
146#[derive(Debug, Clone, Serialize)]
147#[serde(rename_all = "camelCase")]
148struct SuccessEnvelope {
149    api_version: String,
150    id: String,
151    method: String,
152    result: Value,
153    meta: ResponseMeta,
154}
155
156#[derive(Debug, Clone, Serialize)]
157#[serde(rename_all = "camelCase")]
158struct ErrorEnvelope {
159    api_version: String,
160    id: String,
161    #[serde(skip_serializing_if = "Option::is_none")]
162    method: Option<String>,
163    error: crate::errors::RpcErrorBody,
164    meta: ResponseMeta,
165}
166
167pub fn is_known_method(method: &str) -> bool {
168    KNOWN_METHODS.contains(&method)
169}
170
171pub fn is_mutating_method(method: &str) -> bool {
172    MUTATING_METHODS.contains(&method)
173}
174
175pub fn parse_json_value(body: &[u8]) -> Result<Value, ApiError> {
176    serde_json::from_slice::<Value>(body)
177        .map_err(|err| ApiError::invalid_request(format!("invalid JSON body: {err}")))
178}
179
180pub fn request_context(raw: &Value) -> (String, Option<String>) {
181    let request_id = raw
182        .get("id")
183        .and_then(Value::as_str)
184        .filter(|value| !value.is_empty())
185        .unwrap_or("unknown")
186        .to_string();
187    let method = raw
188        .get("method")
189        .and_then(Value::as_str)
190        .map(std::string::ToString::to_string);
191    (request_id, method)
192}
193
194pub fn parse_request(raw: &Value) -> Result<RpcRequestEnvelope, ApiError> {
195    serde_json::from_value::<RpcRequestEnvelope>(raw.clone())
196        .map_err(|err| ApiError::invalid_request(format!("invalid request envelope: {err}")))
197}
198
199pub fn validate_request_schema(raw: &Value) -> Result<(), Vec<String>> {
200    let validator = request_schema_validator();
201    match validator.validate(raw) {
202        Ok(()) => Ok(()),
203        Err(errors) => Err(errors.map(|error| error.to_string()).collect()),
204    }
205}
206
207pub fn success_envelope(request: &RpcRequestEnvelope, result: Value, served_by: &str) -> Value {
208    serde_json::to_value(SuccessEnvelope {
209        api_version: API_VERSION.to_string(),
210        id: request.id.clone(),
211        method: request.method.clone(),
212        result,
213        meta: response_meta(served_by),
214    })
215    .expect("success envelope should always serialize")
216}
217
218pub fn error_envelope(error: &ApiError, served_by: &str) -> Value {
219    serde_json::to_value(ErrorEnvelope {
220        api_version: API_VERSION.to_string(),
221        id: error.request_id.clone(),
222        method: error.method.clone(),
223        error: error.as_body(),
224        meta: response_meta(served_by),
225    })
226    .expect("error envelope should always serialize")
227}
228
229fn response_meta(served_by: &str) -> ResponseMeta {
230    ResponseMeta {
231        served_by: served_by.to_string(),
232        served_at: now_ts(),
233    }
234}
235
236fn request_schema_validator() -> &'static JSONSchema {
237    static REQUEST_VALIDATOR: OnceLock<JSONSchema> = OnceLock::new();
238    REQUEST_VALIDATOR.get_or_init(|| {
239        let raw_schema = include_str!("../data/rpc-v1-schema.json");
240        let root_schema: Value =
241            serde_json::from_str(raw_schema).expect("embedded rpc-v1 schema must be valid JSON");
242        let defs = root_schema
243            .get("$defs")
244            .cloned()
245            .expect("rpc-v1 schema must include $defs");
246
247        let request_schema = json!({
248            "$schema": "https://json-schema.org/draft/2020-12/schema",
249            "$defs": defs,
250            "$ref": "#/$defs/requestEnvelope"
251        });
252
253        JSONSchema::options()
254            .with_draft(Draft::Draft7)
255            .compile(&request_schema)
256            .expect("request envelope schema must compile")
257    })
258}