1use std::sync::OnceLock;
2
3use jsonschema::{Draft, JSONSchema};
4use serde::{Deserialize, Serialize};
5use serde_json::{Value, json};
6
7use crate::errors::ApiError;
8use crate::loop_support::now_ts;
9
10pub const API_VERSION: &str = "v1";
11pub const STREAM_NAME: &str = "events.v1";
12
13pub const KNOWN_METHODS: &[&str] = &[
14 "system.health",
15 "system.version",
16 "system.capabilities",
17 "task.list",
18 "task.get",
19 "task.ready",
20 "task.create",
21 "task.update",
22 "task.close",
23 "task.archive",
24 "task.unarchive",
25 "task.delete",
26 "task.clear",
27 "task.run",
28 "task.run_all",
29 "task.retry",
30 "task.cancel",
31 "task.status",
32 "loop.list",
33 "loop.status",
34 "loop.process",
35 "loop.prune",
36 "loop.retry",
37 "loop.discard",
38 "loop.stop",
39 "loop.merge",
40 "loop.merge_button_state",
41 "loop.trigger_merge_task",
42 "planning.list",
43 "planning.get",
44 "planning.start",
45 "planning.respond",
46 "planning.resume",
47 "planning.delete",
48 "planning.get_artifact",
49 "config.get",
50 "config.update",
51 "preset.list",
52 "collection.list",
53 "collection.get",
54 "collection.create",
55 "collection.update",
56 "collection.delete",
57 "collection.import",
58 "collection.export",
59 "stream.subscribe",
60 "stream.unsubscribe",
61 "stream.ack",
62];
63
64pub const MUTATING_METHODS: &[&str] = &[
65 "task.create",
66 "task.update",
67 "task.close",
68 "task.archive",
69 "task.unarchive",
70 "task.delete",
71 "task.clear",
72 "task.run",
73 "task.run_all",
74 "task.retry",
75 "task.cancel",
76 "loop.process",
77 "loop.prune",
78 "loop.retry",
79 "loop.discard",
80 "loop.stop",
81 "loop.merge",
82 "loop.trigger_merge_task",
83 "planning.start",
84 "planning.respond",
85 "planning.resume",
86 "planning.delete",
87 "config.update",
88 "collection.create",
89 "collection.update",
90 "collection.delete",
91 "collection.import",
92];
93
94pub const STREAM_TOPICS: &[&str] = &[
95 "system.heartbeat",
96 "system.lifecycle",
97 "task.log.line",
98 "task.status.changed",
99 "loop.status.changed",
100 "loop.merge.progress",
101 "planning.prompt.issued",
102 "planning.response.recorded",
103 "planning.artifact.updated",
104 "config.updated",
105 "collection.updated",
106 "preset.refreshed",
107 "error.raised",
108 "stream.keepalive",
109];
110
111#[derive(Debug, Clone, Deserialize)]
112#[serde(rename_all = "camelCase")]
113pub struct RpcRequestEnvelope {
114 pub api_version: String,
115 pub id: String,
116 pub method: String,
117 pub params: Value,
118 pub meta: Option<RequestMeta>,
119}
120
121#[derive(Debug, Clone, Deserialize)]
122#[serde(rename_all = "camelCase")]
123pub struct RequestMeta {
124 pub idempotency_key: Option<String>,
125 pub auth: Option<AuthMeta>,
126 pub timeout_ms: Option<u64>,
127 pub request_ts: Option<String>,
128}
129
130#[derive(Debug, Clone, Deserialize)]
131pub struct AuthMeta {
132 pub mode: String,
133 pub token: Option<String>,
134}
135
136#[derive(Debug, Clone, Serialize)]
137#[serde(rename_all = "camelCase")]
138struct ResponseMeta {
139 served_by: String,
140 served_at: String,
141}
142
143#[derive(Debug, Clone, Serialize)]
144#[serde(rename_all = "camelCase")]
145struct SuccessEnvelope {
146 api_version: String,
147 id: String,
148 method: String,
149 result: Value,
150 meta: ResponseMeta,
151}
152
153#[derive(Debug, Clone, Serialize)]
154#[serde(rename_all = "camelCase")]
155struct ErrorEnvelope {
156 api_version: String,
157 id: String,
158 #[serde(skip_serializing_if = "Option::is_none")]
159 method: Option<String>,
160 error: crate::errors::RpcErrorBody,
161 meta: ResponseMeta,
162}
163
164pub fn is_known_method(method: &str) -> bool {
165 KNOWN_METHODS.contains(&method)
166}
167
168pub fn is_mutating_method(method: &str) -> bool {
169 MUTATING_METHODS.contains(&method)
170}
171
172pub fn parse_json_value(body: &[u8]) -> Result<Value, ApiError> {
173 serde_json::from_slice::<Value>(body)
174 .map_err(|err| ApiError::invalid_request(format!("invalid JSON body: {err}")))
175}
176
177pub fn request_context(raw: &Value) -> (String, Option<String>) {
178 let request_id = raw
179 .get("id")
180 .and_then(Value::as_str)
181 .filter(|value| !value.is_empty())
182 .unwrap_or("unknown")
183 .to_string();
184 let method = raw
185 .get("method")
186 .and_then(Value::as_str)
187 .map(std::string::ToString::to_string);
188 (request_id, method)
189}
190
191pub fn parse_request(raw: &Value) -> Result<RpcRequestEnvelope, ApiError> {
192 serde_json::from_value::<RpcRequestEnvelope>(raw.clone())
193 .map_err(|err| ApiError::invalid_request(format!("invalid request envelope: {err}")))
194}
195
196pub fn validate_request_schema(raw: &Value) -> Result<(), Vec<String>> {
197 let validator = request_schema_validator();
198 match validator.validate(raw) {
199 Ok(()) => Ok(()),
200 Err(errors) => Err(errors.map(|error| error.to_string()).collect()),
201 }
202}
203
204pub fn success_envelope(request: &RpcRequestEnvelope, result: Value, served_by: &str) -> Value {
205 serde_json::to_value(SuccessEnvelope {
206 api_version: API_VERSION.to_string(),
207 id: request.id.clone(),
208 method: request.method.clone(),
209 result,
210 meta: response_meta(served_by),
211 })
212 .expect("success envelope should always serialize")
213}
214
215pub fn error_envelope(error: &ApiError, served_by: &str) -> Value {
216 serde_json::to_value(ErrorEnvelope {
217 api_version: API_VERSION.to_string(),
218 id: error.request_id.clone(),
219 method: error.method.clone(),
220 error: error.as_body(),
221 meta: response_meta(served_by),
222 })
223 .expect("error envelope should always serialize")
224}
225
226fn response_meta(served_by: &str) -> ResponseMeta {
227 ResponseMeta {
228 served_by: served_by.to_string(),
229 served_at: now_ts(),
230 }
231}
232
233fn request_schema_validator() -> &'static JSONSchema {
234 static REQUEST_VALIDATOR: OnceLock<JSONSchema> = OnceLock::new();
235 REQUEST_VALIDATOR.get_or_init(|| {
236 let raw_schema = include_str!("../data/rpc-v1-schema.json");
237 let root_schema: Value =
238 serde_json::from_str(raw_schema).expect("embedded rpc-v1 schema must be valid JSON");
239 let defs = root_schema
240 .get("$defs")
241 .cloned()
242 .expect("rpc-v1 schema must include $defs");
243
244 let request_schema = json!({
245 "$schema": "https://json-schema.org/draft/2020-12/schema",
246 "$defs": defs,
247 "$ref": "#/$defs/requestEnvelope"
248 });
249
250 JSONSchema::options()
251 .with_draft(Draft::Draft7)
252 .compile(&request_schema)
253 .expect("request envelope schema must compile")
254 })
255}