1mod dispatch;
2
3use std::sync::{Arc, Mutex, MutexGuard};
4use std::time::Duration;
5
6use axum::http::{HeaderMap, StatusCode};
7use serde::Deserialize;
8use serde::de::DeserializeOwned;
9use serde_json::{Value, json};
10use tracing::debug;
11
12use crate::auth::{Authenticator, from_config};
13use crate::collection_domain::CollectionDomain;
14use crate::config::ApiConfig;
15use crate::config_domain::ConfigDomain;
16use crate::errors::{ApiError, RpcErrorCode};
17use crate::idempotency::{
18 IdempotencyCheck, IdempotencyStore, InMemoryIdempotencyStore, StoredResponse,
19};
20use crate::loop_domain::LoopDomain;
21use crate::planning_domain::PlanningDomain;
22use crate::preset_domain::PresetDomain;
23use crate::protocol::{
24 API_VERSION, KNOWN_METHODS, RpcRequestEnvelope, STREAM_TOPICS, error_envelope, is_known_method,
25 is_mutating_method, parse_json_value, parse_request, request_context, success_envelope,
26 validate_request_schema,
27};
28use crate::stream_domain::StreamDomain;
29use crate::task_domain::TaskDomain;
30
31#[derive(Debug, Clone, Deserialize)]
32#[serde(rename_all = "camelCase")]
33pub(crate) struct IdOnlyParams {
34 pub(crate) id: String,
35}
36
37#[derive(Clone)]
38pub struct RpcRuntime {
39 pub(crate) config: ApiConfig,
40 auth: Arc<dyn Authenticator>,
41 idempotency: Arc<dyn IdempotencyStore>,
42 tasks: Arc<Mutex<TaskDomain>>,
43 loops: Arc<Mutex<LoopDomain>>,
44 planning: Arc<Mutex<PlanningDomain>>,
45 collections: Arc<Mutex<CollectionDomain>>,
46 streams: StreamDomain,
47 config_domain: ConfigDomain,
48 preset_domain: PresetDomain,
49}
50
51impl RpcRuntime {
52 pub fn new(config: ApiConfig) -> anyhow::Result<Self> {
53 config.validate()?;
54
55 let auth = from_config(&config)?;
56 let idempotency = Arc::new(InMemoryIdempotencyStore::new(Duration::from_secs(
57 config.idempotency_ttl_secs,
58 )));
59
60 Ok(Self::with_components(config, auth, idempotency))
61 }
62
63 pub fn with_components(
64 config: ApiConfig,
65 auth: Arc<dyn Authenticator>,
66 idempotency: Arc<dyn IdempotencyStore>,
67 ) -> Self {
68 let tasks = Arc::new(Mutex::new(TaskDomain::new(&config.workspace_root)));
69 let loops = Arc::new(Mutex::new(LoopDomain::new(
70 &config.workspace_root,
71 config.loop_process_interval_ms,
72 config.ralph_command.clone(),
73 )));
74 let planning = Arc::new(Mutex::new(PlanningDomain::new(&config.workspace_root)));
75 let collections = Arc::new(Mutex::new(CollectionDomain::new(&config.workspace_root)));
76 let streams = StreamDomain::new();
77 let config_domain = ConfigDomain::new(&config.workspace_root);
78 let preset_domain = PresetDomain::new(&config.workspace_root);
79
80 Self {
81 config,
82 auth,
83 idempotency,
84 tasks,
85 loops,
86 planning,
87 collections,
88 streams,
89 config_domain,
90 preset_domain,
91 }
92 }
93
94 pub fn health_payload(&self) -> Value {
95 json!({
96 "status": "ok",
97 "timestamp": crate::loop_support::now_ts()
98 })
99 }
100
101 pub fn capabilities_payload(&self) -> Value {
102 json!({
103 "methods": KNOWN_METHODS,
104 "streamTopics": STREAM_TOPICS,
105 "auth": {
106 "mode": self.auth.mode().as_contract_mode(),
107 "supportedModes": ["trusted_local", "token"]
108 },
109 "idempotency": {
110 "requiredForMutations": true,
111 "retentionSeconds": self.config.idempotency_ttl_secs
112 }
113 })
114 }
115
116 pub fn invoke_method(
117 &self,
118 request_id: impl Into<String>,
119 method: &str,
120 params: Value,
121 principal: &str,
122 idempotency_key: Option<String>,
123 ) -> Result<Value, ApiError> {
124 let request_id = request_id.into();
125 let mut raw = json!({
126 "apiVersion": API_VERSION,
127 "id": request_id,
128 "method": method,
129 "params": params,
130 });
131
132 if let Some(idempotency_key) = idempotency_key {
133 raw["meta"] = json!({
134 "idempotencyKey": idempotency_key,
135 });
136 }
137
138 let request = self.parse_and_validate_request_value(raw)?;
139 self.execute_request(&request, principal)
140 }
141
142 pub fn handle_http_request(&self, body: &[u8], headers: &HeaderMap) -> (StatusCode, Value) {
143 let request = match self.parse_and_validate_request(body) {
144 Ok(request) => request,
145 Err(error) => {
146 let status = error.status;
147 let envelope = error_envelope(&error, &self.config.served_by);
148 return (status, envelope);
149 }
150 };
151
152 let principal =
153 match self.auth.authorize(&request, headers).map_err(|error| {
154 error.with_context(request.id.clone(), Some(request.method.clone()))
155 }) {
156 Ok(p) => p,
157 Err(error) => {
158 let status = error.status;
159 let envelope = error_envelope(&error, &self.config.served_by);
160 return (status, envelope);
161 }
162 };
163
164 let (status, envelope) = match self.execute_request(&request, &principal) {
165 Ok(result) => (
166 StatusCode::OK,
167 success_envelope(&request, result, &self.config.served_by),
168 ),
169 Err(error) => {
170 let status = error.status;
171 let envelope = error_envelope(&error, &self.config.served_by);
172 (status, envelope)
173 }
174 };
175
176 (status, envelope)
177 }
178
179 pub fn authenticate_websocket(&self, headers: &HeaderMap) -> Result<String, ApiError> {
180 let dummy_request = crate::protocol::RpcRequestEnvelope {
181 api_version: "v1".to_string(),
182 id: "ws-upgrade".to_string(),
183 method: "stream.subscribe".to_string(),
184 params: serde_json::Value::Object(serde_json::Map::new()),
185 meta: None,
186 };
187
188 self.auth
189 .authorize(&dummy_request, headers)
190 .map_err(|error| error.with_context("ws-upgrade", Some("stream.subscribe".to_string())))
191 }
192
193 pub(crate) fn task_domain_mut(&self) -> Result<MutexGuard<'_, TaskDomain>, ApiError> {
194 self.tasks
195 .lock()
196 .map_err(|_| ApiError::internal("task domain lock poisoned"))
197 }
198
199 pub(crate) fn loop_domain_mut(&self) -> Result<MutexGuard<'_, LoopDomain>, ApiError> {
200 self.loops
201 .lock()
202 .map_err(|_| ApiError::internal("loop domain lock poisoned"))
203 }
204
205 pub(crate) fn planning_domain_mut(&self) -> Result<MutexGuard<'_, PlanningDomain>, ApiError> {
206 self.planning
207 .lock()
208 .map_err(|_| ApiError::internal("planning domain lock poisoned"))
209 }
210
211 pub(crate) fn collection_domain_mut(
212 &self,
213 ) -> Result<MutexGuard<'_, CollectionDomain>, ApiError> {
214 self.collections
215 .lock()
216 .map_err(|_| ApiError::internal("collection domain lock poisoned"))
217 }
218
219 pub(crate) fn stream_domain(&self) -> StreamDomain {
220 self.streams.clone()
221 }
222
223 pub(crate) fn config_domain(&self) -> &ConfigDomain {
224 &self.config_domain
225 }
226
227 pub(crate) fn preset_domain(&self) -> &PresetDomain {
228 &self.preset_domain
229 }
230
231 pub(crate) fn parse_params<T>(&self, request: &RpcRequestEnvelope) -> Result<T, ApiError>
232 where
233 T: DeserializeOwned,
234 {
235 serde_json::from_value(request.params.clone()).map_err(|error| {
236 ApiError::invalid_params(format!(
237 "invalid params for method '{}': {error}",
238 request.method
239 ))
240 })
241 }
242
243 fn parse_and_validate_request(&self, body: &[u8]) -> Result<RpcRequestEnvelope, ApiError> {
244 let raw = parse_json_value(body)?;
245 self.parse_and_validate_request_value(raw)
246 }
247
248 fn parse_and_validate_request_value(&self, raw: Value) -> Result<RpcRequestEnvelope, ApiError> {
249 let (request_id, method) = request_context(&raw);
250
251 if !raw.is_object() {
252 return Err(
253 ApiError::invalid_request("request body must be a JSON object")
254 .with_context(request_id, method),
255 );
256 }
257
258 let method = method.ok_or_else(|| {
259 ApiError::invalid_request("missing required field 'method'")
260 .with_context(request_id.clone(), None)
261 })?;
262
263 if !is_known_method(&method) {
264 return Err(
265 ApiError::method_not_found(method.clone()).with_context(request_id, Some(method))
266 );
267 }
268
269 if let Err(errors) = validate_request_schema(&raw) {
270 return Err(
271 ApiError::invalid_params("request does not match rpc-v1 schema")
272 .with_context(request_id, Some(method))
273 .with_details(json!({ "errors": errors })),
274 );
275 }
276
277 let request = parse_request(&raw)
278 .map_err(|error| error.with_context(request_id.clone(), Some(method.clone())))?;
279
280 if request.api_version != API_VERSION {
281 return Err(ApiError::invalid_request(format!(
282 "unsupported apiVersion '{}'; expected '{API_VERSION}'",
283 request.api_version
284 ))
285 .with_context(request.id, Some(request.method)));
286 }
287
288 Ok(request)
289 }
290
291 fn execute_request(
292 &self,
293 request: &RpcRequestEnvelope,
294 principal: &str,
295 ) -> Result<Value, ApiError> {
296 let mut idempotency_context: Option<String> = None;
297 if is_mutating_method(&request.method) {
298 let key = match request
299 .meta
300 .as_ref()
301 .and_then(|meta| meta.idempotency_key.as_deref())
302 {
303 Some(key) => key,
304 None => {
305 return Err(ApiError::invalid_params(
306 "mutating methods require meta.idempotencyKey",
307 )
308 .with_context(request.id.clone(), Some(request.method.clone())));
309 }
310 };
311
312 match self
313 .idempotency
314 .check(&request.method, key, &request.params)
315 {
316 IdempotencyCheck::Replay(response) => {
317 debug!(
318 method = %request.method,
319 request_id = %request.id,
320 "idempotency replay"
321 );
322 return self.replay_stored_response(request, response);
323 }
324 IdempotencyCheck::Conflict => {
325 return Err(ApiError::idempotency_conflict(
326 "idempotency key was already used with different parameters",
327 )
328 .with_context(request.id.clone(), Some(request.method.clone()))
329 .with_details(json!({
330 "method": request.method.clone(),
331 "idempotencyKey": key
332 })));
333 }
334 IdempotencyCheck::New => {
335 idempotency_context = Some(key.to_string());
336 }
337 }
338 }
339
340 let result = self
341 .dispatch(request, principal)
342 .map_err(|error| error.with_context(request.id.clone(), Some(request.method.clone())));
343
344 if let Some(key) = idempotency_context {
345 let (status, envelope) = match &result {
346 Ok(value) => (
347 StatusCode::OK.as_u16(),
348 success_envelope(request, value.clone(), &self.config.served_by),
349 ),
350 Err(error) => (
351 error.status.as_u16(),
352 error_envelope(error, &self.config.served_by),
353 ),
354 };
355 self.idempotency.store(
356 &request.method,
357 &key,
358 &request.params,
359 &StoredResponse { status, envelope },
360 );
361 }
362
363 result
364 }
365
366 fn replay_stored_response(
367 &self,
368 request: &RpcRequestEnvelope,
369 response: StoredResponse,
370 ) -> Result<Value, ApiError> {
371 if let Some(result) = response.envelope.get("result") {
372 return Ok(result.clone());
373 }
374
375 let error_body = response
376 .envelope
377 .get("error")
378 .and_then(Value::as_object)
379 .ok_or_else(|| {
380 ApiError::internal("stored idempotency response was missing an error payload")
381 .with_context(request.id.clone(), Some(request.method.clone()))
382 })?;
383
384 let code = error_body
385 .get("code")
386 .and_then(Value::as_str)
387 .and_then(RpcErrorCode::from_contract)
388 .unwrap_or(RpcErrorCode::Internal);
389 let message = error_body
390 .get("message")
391 .and_then(Value::as_str)
392 .unwrap_or("stored idempotency replay failed");
393 let retryable = error_body
394 .get("retryable")
395 .and_then(Value::as_bool)
396 .unwrap_or(false);
397
398 let mut error = ApiError::new(code, message)
399 .with_context(request.id.clone(), Some(request.method.clone()));
400 error.retryable = retryable;
401 error.details = error_body.get("details").cloned();
402 error.status = StatusCode::from_u16(response.status).unwrap_or(error.status);
403 Err(error)
404 }
405}