1mod dispatch;
2
3use std::sync::{Arc, Mutex, MutexGuard};
4use std::time::Duration;
5
6use axum::http::{HeaderMap, StatusCode};
7use serde::Deserialize;
8use serde::de::DeserializeOwned;
9use serde_json::{Value, json};
10use tracing::debug;
11
12use crate::auth::{Authenticator, from_config};
13use crate::collection_domain::CollectionDomain;
14use crate::config::ApiConfig;
15use crate::config_domain::ConfigDomain;
16use crate::errors::{ApiError, RpcErrorCode};
17use crate::idempotency::{
18 IdempotencyCheck, IdempotencyStore, InMemoryIdempotencyStore, StoredResponse,
19};
20use crate::loop_domain::LoopDomain;
21use crate::planning_domain::PlanningDomain;
22use crate::preset_domain::PresetDomain;
23use crate::protocol::{
24 API_VERSION, KNOWN_METHODS, RpcRequestEnvelope, STREAM_TOPICS, error_envelope, is_known_method,
25 is_mutating_method, parse_json_value, parse_request, request_context, success_envelope,
26 validate_request_schema,
27};
28use crate::stream_domain::StreamDomain;
29use crate::task_domain::TaskDomain;
30
31#[derive(Debug, Clone, Deserialize)]
32#[serde(rename_all = "camelCase")]
33pub(crate) struct IdOnlyParams {
34 pub(crate) id: String,
35}
36
37#[derive(Clone)]
38pub struct RpcRuntime {
39 pub(crate) config: ApiConfig,
40 auth: Arc<dyn Authenticator>,
41 idempotency: Arc<dyn IdempotencyStore>,
42 tasks: Arc<Mutex<TaskDomain>>,
43 loops: Arc<Mutex<LoopDomain>>,
44 planning: Arc<Mutex<PlanningDomain>>,
45 collections: Arc<Mutex<CollectionDomain>>,
46 streams: StreamDomain,
47 config_domain: ConfigDomain,
48 preset_domain: PresetDomain,
49}
50
51enum ExecutionOutcome {
52 Fresh(Value),
53 Replay(StoredResponse),
54}
55
56impl RpcRuntime {
57 pub fn new(config: ApiConfig) -> anyhow::Result<Self> {
58 config.validate()?;
59
60 let auth = from_config(&config)?;
61 let idempotency = Arc::new(InMemoryIdempotencyStore::new(Duration::from_secs(
62 config.idempotency_ttl_secs,
63 )));
64
65 Ok(Self::with_components(config, auth, idempotency))
66 }
67
68 pub fn with_components(
69 config: ApiConfig,
70 auth: Arc<dyn Authenticator>,
71 idempotency: Arc<dyn IdempotencyStore>,
72 ) -> Self {
73 let tasks = Arc::new(Mutex::new(TaskDomain::new(&config.workspace_root)));
74 let loops = Arc::new(Mutex::new(LoopDomain::new(
75 &config.workspace_root,
76 config.loop_process_interval_ms,
77 config.ralph_command.clone(),
78 )));
79 let planning = Arc::new(Mutex::new(PlanningDomain::new(&config.workspace_root)));
80 let collections = Arc::new(Mutex::new(CollectionDomain::new(&config.workspace_root)));
81 let streams = StreamDomain::new();
82 let config_domain = ConfigDomain::new(&config.workspace_root);
83 let preset_domain = PresetDomain::new(&config.workspace_root);
84
85 Self {
86 config,
87 auth,
88 idempotency,
89 tasks,
90 loops,
91 planning,
92 collections,
93 streams,
94 config_domain,
95 preset_domain,
96 }
97 }
98
99 pub fn health_payload(&self) -> Value {
100 json!({
101 "status": "ok",
102 "timestamp": crate::loop_support::now_ts()
103 })
104 }
105
106 pub fn capabilities_payload(&self) -> Value {
107 json!({
108 "methods": KNOWN_METHODS,
109 "streamTopics": STREAM_TOPICS,
110 "auth": {
111 "mode": self.auth.mode().as_contract_mode(),
112 "supportedModes": ["trusted_local", "token"]
113 },
114 "idempotency": {
115 "requiredForMutations": true,
116 "retentionSeconds": self.config.idempotency_ttl_secs
117 }
118 })
119 }
120
121 pub fn invoke_method(
122 &self,
123 request_id: impl Into<String>,
124 method: &str,
125 params: Value,
126 principal: &str,
127 idempotency_key: Option<String>,
128 ) -> Result<Value, ApiError> {
129 let request_id = request_id.into();
130 let mut raw = json!({
131 "apiVersion": API_VERSION,
132 "id": request_id,
133 "method": method,
134 "params": params,
135 });
136
137 if let Some(idempotency_key) = idempotency_key {
138 raw["meta"] = json!({
139 "idempotencyKey": idempotency_key,
140 });
141 }
142
143 let request = self.parse_and_validate_request_value(raw)?;
144 match self.execute_request(&request, principal)? {
145 ExecutionOutcome::Fresh(result) => Ok(result),
146 ExecutionOutcome::Replay(response) => self.replay_stored_response(&request, response),
147 }
148 }
149
150 pub fn handle_http_request(&self, body: &[u8], headers: &HeaderMap) -> (StatusCode, Value) {
151 let request = match self.parse_and_validate_request(body) {
152 Ok(request) => request,
153 Err(error) => {
154 let status = error.status;
155 let envelope = error_envelope(&error, &self.config.served_by);
156 return (status, envelope);
157 }
158 };
159
160 let principal =
161 match self.auth.authorize(&request, headers).map_err(|error| {
162 error.with_context(request.id.clone(), Some(request.method.clone()))
163 }) {
164 Ok(p) => p,
165 Err(error) => {
166 let status = error.status;
167 let envelope = error_envelope(&error, &self.config.served_by);
168 return (status, envelope);
169 }
170 };
171
172 let (status, envelope) = match self.execute_request(&request, &principal) {
173 Ok(ExecutionOutcome::Fresh(result)) => (
174 StatusCode::OK,
175 success_envelope(&request, result, &self.config.served_by),
176 ),
177 Ok(ExecutionOutcome::Replay(response)) => (
178 StatusCode::from_u16(response.status).unwrap_or(StatusCode::OK),
179 response.envelope,
180 ),
181 Err(error) => {
182 let status = error.status;
183 let envelope = error_envelope(&error, &self.config.served_by);
184 (status, envelope)
185 }
186 };
187
188 (status, envelope)
189 }
190
191 pub fn authenticate_websocket(&self, headers: &HeaderMap) -> Result<String, ApiError> {
192 let dummy_request = crate::protocol::RpcRequestEnvelope {
193 api_version: "v1".to_string(),
194 id: "ws-upgrade".to_string(),
195 method: "stream.subscribe".to_string(),
196 params: serde_json::Value::Object(serde_json::Map::new()),
197 meta: None,
198 };
199
200 self.auth
201 .authorize(&dummy_request, headers)
202 .map_err(|error| error.with_context("ws-upgrade", Some("stream.subscribe".to_string())))
203 }
204
205 pub(crate) fn task_domain_mut(&self) -> Result<MutexGuard<'_, TaskDomain>, ApiError> {
206 self.tasks
207 .lock()
208 .map_err(|_| ApiError::internal("task domain lock poisoned"))
209 }
210
211 pub(crate) fn loop_domain_mut(&self) -> Result<MutexGuard<'_, LoopDomain>, ApiError> {
212 self.loops
213 .lock()
214 .map_err(|_| ApiError::internal("loop domain lock poisoned"))
215 }
216
217 pub(crate) fn planning_domain_mut(&self) -> Result<MutexGuard<'_, PlanningDomain>, ApiError> {
218 self.planning
219 .lock()
220 .map_err(|_| ApiError::internal("planning domain lock poisoned"))
221 }
222
223 pub(crate) fn collection_domain_mut(
224 &self,
225 ) -> Result<MutexGuard<'_, CollectionDomain>, ApiError> {
226 self.collections
227 .lock()
228 .map_err(|_| ApiError::internal("collection domain lock poisoned"))
229 }
230
231 pub(crate) fn stream_domain(&self) -> StreamDomain {
232 self.streams.clone()
233 }
234
235 pub(crate) fn config_domain(&self) -> &ConfigDomain {
236 &self.config_domain
237 }
238
239 pub(crate) fn preset_domain(&self) -> &PresetDomain {
240 &self.preset_domain
241 }
242
243 pub(crate) fn parse_params<T>(&self, request: &RpcRequestEnvelope) -> Result<T, ApiError>
244 where
245 T: DeserializeOwned,
246 {
247 serde_json::from_value(request.params.clone()).map_err(|error| {
248 ApiError::invalid_params(format!(
249 "invalid params for method '{}': {error}",
250 request.method
251 ))
252 })
253 }
254
255 fn parse_and_validate_request(&self, body: &[u8]) -> Result<RpcRequestEnvelope, ApiError> {
256 let raw = parse_json_value(body)?;
257 self.parse_and_validate_request_value(raw)
258 }
259
260 fn parse_and_validate_request_value(&self, raw: Value) -> Result<RpcRequestEnvelope, ApiError> {
261 let (request_id, method) = request_context(&raw);
262
263 if !raw.is_object() {
264 return Err(
265 ApiError::invalid_request("request body must be a JSON object")
266 .with_context(request_id, method),
267 );
268 }
269
270 let method = method.ok_or_else(|| {
271 ApiError::invalid_request("missing required field 'method'")
272 .with_context(request_id.clone(), None)
273 })?;
274
275 if !is_known_method(&method) {
276 return Err(
277 ApiError::method_not_found(method.clone()).with_context(request_id, Some(method))
278 );
279 }
280
281 if let Err(errors) = validate_request_schema(&raw) {
282 return Err(
283 ApiError::invalid_params("request does not match rpc-v1 schema")
284 .with_context(request_id, Some(method))
285 .with_details(json!({ "errors": errors })),
286 );
287 }
288
289 let request = parse_request(&raw)
290 .map_err(|error| error.with_context(request_id.clone(), Some(method.clone())))?;
291
292 if request.api_version != API_VERSION {
293 return Err(ApiError::invalid_request(format!(
294 "unsupported apiVersion '{}'; expected '{API_VERSION}'",
295 request.api_version
296 ))
297 .with_context(request.id, Some(request.method)));
298 }
299
300 Ok(request)
301 }
302
303 fn execute_request(
304 &self,
305 request: &RpcRequestEnvelope,
306 principal: &str,
307 ) -> Result<ExecutionOutcome, ApiError> {
308 let mut idempotency_context: Option<String> = None;
309 if is_mutating_method(&request.method) {
310 let key = match request
311 .meta
312 .as_ref()
313 .and_then(|meta| meta.idempotency_key.as_deref())
314 {
315 Some(key) => key,
316 None => {
317 return Err(ApiError::invalid_params(
318 "mutating methods require meta.idempotencyKey",
319 )
320 .with_context(request.id.clone(), Some(request.method.clone())));
321 }
322 };
323
324 match self
325 .idempotency
326 .check(&request.method, key, &request.params)
327 {
328 IdempotencyCheck::Replay(response) => {
329 debug!(
330 method = %request.method,
331 request_id = %request.id,
332 "idempotency replay"
333 );
334 return Ok(ExecutionOutcome::Replay(response));
335 }
336 IdempotencyCheck::Conflict => {
337 return Err(ApiError::idempotency_conflict(
338 "idempotency key was already used with different parameters",
339 )
340 .with_context(request.id.clone(), Some(request.method.clone()))
341 .with_details(json!({
342 "method": request.method.clone(),
343 "idempotencyKey": key
344 })));
345 }
346 IdempotencyCheck::New => {
347 idempotency_context = Some(key.to_string());
348 }
349 }
350 }
351
352 let result = self
353 .dispatch(request, principal)
354 .map_err(|error| error.with_context(request.id.clone(), Some(request.method.clone())));
355
356 if let Some(key) = idempotency_context {
357 let (status, envelope) = match &result {
358 Ok(value) => (
359 StatusCode::OK.as_u16(),
360 success_envelope(request, value.clone(), &self.config.served_by),
361 ),
362 Err(error) => (
363 error.status.as_u16(),
364 error_envelope(error, &self.config.served_by),
365 ),
366 };
367 self.idempotency.store(
368 &request.method,
369 &key,
370 &request.params,
371 &StoredResponse { status, envelope },
372 );
373 }
374
375 result.map(ExecutionOutcome::Fresh)
376 }
377
378 fn replay_stored_response(
379 &self,
380 request: &RpcRequestEnvelope,
381 response: StoredResponse,
382 ) -> Result<Value, ApiError> {
383 if let Some(result) = response.envelope.get("result") {
384 return Ok(result.clone());
385 }
386
387 let error_body = response
388 .envelope
389 .get("error")
390 .and_then(Value::as_object)
391 .ok_or_else(|| {
392 ApiError::internal("stored idempotency response was missing an error payload")
393 .with_context(request.id.clone(), Some(request.method.clone()))
394 })?;
395
396 let code = error_body
397 .get("code")
398 .and_then(Value::as_str)
399 .and_then(RpcErrorCode::from_contract)
400 .unwrap_or(RpcErrorCode::Internal);
401 let message = error_body
402 .get("message")
403 .and_then(Value::as_str)
404 .unwrap_or("stored idempotency replay failed");
405 let retryable = error_body
406 .get("retryable")
407 .and_then(Value::as_bool)
408 .unwrap_or(false);
409
410 let mut error = ApiError::new(code, message)
411 .with_context(request.id.clone(), Some(request.method.clone()));
412 error.retryable = retryable;
413 error.details = error_body.get("details").cloned();
414 error.status = StatusCode::from_u16(response.status).unwrap_or(error.status);
415 Err(error)
416 }
417}