Skip to main content

ralph_api/
runtime.rs

1mod dispatch;
2
3use std::sync::{Arc, Mutex, MutexGuard};
4use std::time::Duration;
5
6use axum::http::{HeaderMap, StatusCode};
7use serde::Deserialize;
8use serde::de::DeserializeOwned;
9use serde_json::{Value, json};
10use tracing::debug;
11
12use crate::auth::{Authenticator, from_config};
13use crate::collection_domain::CollectionDomain;
14use crate::config::ApiConfig;
15use crate::config_domain::ConfigDomain;
16use crate::errors::{ApiError, RpcErrorCode};
17use crate::idempotency::{
18    IdempotencyCheck, IdempotencyStore, InMemoryIdempotencyStore, StoredResponse,
19};
20use crate::loop_domain::LoopDomain;
21use crate::planning_domain::PlanningDomain;
22use crate::preset_domain::PresetDomain;
23use crate::protocol::{
24    API_VERSION, KNOWN_METHODS, RpcRequestEnvelope, STREAM_TOPICS, error_envelope, is_known_method,
25    is_mutating_method, parse_json_value, parse_request, request_context, success_envelope,
26    validate_request_schema,
27};
28use crate::stream_domain::StreamDomain;
29use crate::task_domain::TaskDomain;
30
31#[derive(Debug, Clone, Deserialize)]
32#[serde(rename_all = "camelCase")]
33pub(crate) struct IdOnlyParams {
34    pub(crate) id: String,
35}
36
37#[derive(Clone)]
38pub struct RpcRuntime {
39    pub(crate) config: ApiConfig,
40    auth: Arc<dyn Authenticator>,
41    idempotency: Arc<dyn IdempotencyStore>,
42    tasks: Arc<Mutex<TaskDomain>>,
43    loops: Arc<Mutex<LoopDomain>>,
44    planning: Arc<Mutex<PlanningDomain>>,
45    collections: Arc<Mutex<CollectionDomain>>,
46    streams: StreamDomain,
47    config_domain: ConfigDomain,
48    preset_domain: PresetDomain,
49}
50
51impl RpcRuntime {
52    pub fn new(config: ApiConfig) -> anyhow::Result<Self> {
53        config.validate()?;
54
55        let auth = from_config(&config)?;
56        let idempotency = Arc::new(InMemoryIdempotencyStore::new(Duration::from_secs(
57            config.idempotency_ttl_secs,
58        )));
59
60        Ok(Self::with_components(config, auth, idempotency))
61    }
62
63    pub fn with_components(
64        config: ApiConfig,
65        auth: Arc<dyn Authenticator>,
66        idempotency: Arc<dyn IdempotencyStore>,
67    ) -> Self {
68        let tasks = Arc::new(Mutex::new(TaskDomain::new(&config.workspace_root)));
69        let loops = Arc::new(Mutex::new(LoopDomain::new(
70            &config.workspace_root,
71            config.loop_process_interval_ms,
72            config.ralph_command.clone(),
73        )));
74        let planning = Arc::new(Mutex::new(PlanningDomain::new(&config.workspace_root)));
75        let collections = Arc::new(Mutex::new(CollectionDomain::new(&config.workspace_root)));
76        let streams = StreamDomain::new();
77        let config_domain = ConfigDomain::new(&config.workspace_root);
78        let preset_domain = PresetDomain::new(&config.workspace_root);
79
80        Self {
81            config,
82            auth,
83            idempotency,
84            tasks,
85            loops,
86            planning,
87            collections,
88            streams,
89            config_domain,
90            preset_domain,
91        }
92    }
93
94    pub fn health_payload(&self) -> Value {
95        json!({
96            "status": "ok",
97            "timestamp": crate::loop_support::now_ts()
98        })
99    }
100
101    pub fn capabilities_payload(&self) -> Value {
102        json!({
103            "methods": KNOWN_METHODS,
104            "streamTopics": STREAM_TOPICS,
105            "auth": {
106                "mode": self.auth.mode().as_contract_mode(),
107                "supportedModes": ["trusted_local", "token"]
108            },
109            "idempotency": {
110                "requiredForMutations": true,
111                "retentionSeconds": self.config.idempotency_ttl_secs
112            }
113        })
114    }
115
116    pub fn invoke_method(
117        &self,
118        request_id: impl Into<String>,
119        method: &str,
120        params: Value,
121        principal: &str,
122        idempotency_key: Option<String>,
123    ) -> Result<Value, ApiError> {
124        let request_id = request_id.into();
125        let mut raw = json!({
126            "apiVersion": API_VERSION,
127            "id": request_id,
128            "method": method,
129            "params": params,
130        });
131
132        if let Some(idempotency_key) = idempotency_key {
133            raw["meta"] = json!({
134                "idempotencyKey": idempotency_key,
135            });
136        }
137
138        let request = self.parse_and_validate_request_value(raw)?;
139        self.execute_request(&request, principal)
140    }
141
142    pub fn handle_http_request(&self, body: &[u8], headers: &HeaderMap) -> (StatusCode, Value) {
143        let request = match self.parse_and_validate_request(body) {
144            Ok(request) => request,
145            Err(error) => {
146                let status = error.status;
147                let envelope = error_envelope(&error, &self.config.served_by);
148                return (status, envelope);
149            }
150        };
151
152        let principal =
153            match self.auth.authorize(&request, headers).map_err(|error| {
154                error.with_context(request.id.clone(), Some(request.method.clone()))
155            }) {
156                Ok(p) => p,
157                Err(error) => {
158                    let status = error.status;
159                    let envelope = error_envelope(&error, &self.config.served_by);
160                    return (status, envelope);
161                }
162            };
163
164        let (status, envelope) = match self.execute_request(&request, &principal) {
165            Ok(result) => (
166                StatusCode::OK,
167                success_envelope(&request, result, &self.config.served_by),
168            ),
169            Err(error) => {
170                let status = error.status;
171                let envelope = error_envelope(&error, &self.config.served_by);
172                (status, envelope)
173            }
174        };
175
176        (status, envelope)
177    }
178
179    pub fn authenticate_websocket(&self, headers: &HeaderMap) -> Result<String, ApiError> {
180        let dummy_request = crate::protocol::RpcRequestEnvelope {
181            api_version: "v1".to_string(),
182            id: "ws-upgrade".to_string(),
183            method: "stream.subscribe".to_string(),
184            params: serde_json::Value::Object(serde_json::Map::new()),
185            meta: None,
186        };
187
188        self.auth
189            .authorize(&dummy_request, headers)
190            .map_err(|error| error.with_context("ws-upgrade", Some("stream.subscribe".to_string())))
191    }
192
193    pub(crate) fn task_domain_mut(&self) -> Result<MutexGuard<'_, TaskDomain>, ApiError> {
194        self.tasks
195            .lock()
196            .map_err(|_| ApiError::internal("task domain lock poisoned"))
197    }
198
199    pub(crate) fn loop_domain_mut(&self) -> Result<MutexGuard<'_, LoopDomain>, ApiError> {
200        self.loops
201            .lock()
202            .map_err(|_| ApiError::internal("loop domain lock poisoned"))
203    }
204
205    pub(crate) fn planning_domain_mut(&self) -> Result<MutexGuard<'_, PlanningDomain>, ApiError> {
206        self.planning
207            .lock()
208            .map_err(|_| ApiError::internal("planning domain lock poisoned"))
209    }
210
211    pub(crate) fn collection_domain_mut(
212        &self,
213    ) -> Result<MutexGuard<'_, CollectionDomain>, ApiError> {
214        self.collections
215            .lock()
216            .map_err(|_| ApiError::internal("collection domain lock poisoned"))
217    }
218
219    pub(crate) fn stream_domain(&self) -> StreamDomain {
220        self.streams.clone()
221    }
222
223    pub(crate) fn config_domain(&self) -> &ConfigDomain {
224        &self.config_domain
225    }
226
227    pub(crate) fn preset_domain(&self) -> &PresetDomain {
228        &self.preset_domain
229    }
230
231    pub(crate) fn parse_params<T>(&self, request: &RpcRequestEnvelope) -> Result<T, ApiError>
232    where
233        T: DeserializeOwned,
234    {
235        serde_json::from_value(request.params.clone()).map_err(|error| {
236            ApiError::invalid_params(format!(
237                "invalid params for method '{}': {error}",
238                request.method
239            ))
240        })
241    }
242
243    fn parse_and_validate_request(&self, body: &[u8]) -> Result<RpcRequestEnvelope, ApiError> {
244        let raw = parse_json_value(body)?;
245        self.parse_and_validate_request_value(raw)
246    }
247
248    fn parse_and_validate_request_value(&self, raw: Value) -> Result<RpcRequestEnvelope, ApiError> {
249        let (request_id, method) = request_context(&raw);
250
251        if !raw.is_object() {
252            return Err(
253                ApiError::invalid_request("request body must be a JSON object")
254                    .with_context(request_id, method),
255            );
256        }
257
258        let method = method.ok_or_else(|| {
259            ApiError::invalid_request("missing required field 'method'")
260                .with_context(request_id.clone(), None)
261        })?;
262
263        if !is_known_method(&method) {
264            return Err(
265                ApiError::method_not_found(method.clone()).with_context(request_id, Some(method))
266            );
267        }
268
269        if let Err(errors) = validate_request_schema(&raw) {
270            return Err(
271                ApiError::invalid_params("request does not match rpc-v1 schema")
272                    .with_context(request_id, Some(method))
273                    .with_details(json!({ "errors": errors })),
274            );
275        }
276
277        let request = parse_request(&raw)
278            .map_err(|error| error.with_context(request_id.clone(), Some(method.clone())))?;
279
280        if request.api_version != API_VERSION {
281            return Err(ApiError::invalid_request(format!(
282                "unsupported apiVersion '{}'; expected '{API_VERSION}'",
283                request.api_version
284            ))
285            .with_context(request.id, Some(request.method)));
286        }
287
288        Ok(request)
289    }
290
291    fn execute_request(
292        &self,
293        request: &RpcRequestEnvelope,
294        principal: &str,
295    ) -> Result<Value, ApiError> {
296        let mut idempotency_context: Option<String> = None;
297        if is_mutating_method(&request.method) {
298            let key = match request
299                .meta
300                .as_ref()
301                .and_then(|meta| meta.idempotency_key.as_deref())
302            {
303                Some(key) => key,
304                None => {
305                    return Err(ApiError::invalid_params(
306                        "mutating methods require meta.idempotencyKey",
307                    )
308                    .with_context(request.id.clone(), Some(request.method.clone())));
309                }
310            };
311
312            match self
313                .idempotency
314                .check(&request.method, key, &request.params)
315            {
316                IdempotencyCheck::Replay(response) => {
317                    debug!(
318                        method = %request.method,
319                        request_id = %request.id,
320                        "idempotency replay"
321                    );
322                    return self.replay_stored_response(request, response);
323                }
324                IdempotencyCheck::Conflict => {
325                    return Err(ApiError::idempotency_conflict(
326                        "idempotency key was already used with different parameters",
327                    )
328                    .with_context(request.id.clone(), Some(request.method.clone()))
329                    .with_details(json!({
330                        "method": request.method.clone(),
331                        "idempotencyKey": key
332                    })));
333                }
334                IdempotencyCheck::New => {
335                    idempotency_context = Some(key.to_string());
336                }
337            }
338        }
339
340        let result = self
341            .dispatch(request, principal)
342            .map_err(|error| error.with_context(request.id.clone(), Some(request.method.clone())));
343
344        if let Some(key) = idempotency_context {
345            let (status, envelope) = match &result {
346                Ok(value) => (
347                    StatusCode::OK.as_u16(),
348                    success_envelope(request, value.clone(), &self.config.served_by),
349                ),
350                Err(error) => (
351                    error.status.as_u16(),
352                    error_envelope(error, &self.config.served_by),
353                ),
354            };
355            self.idempotency.store(
356                &request.method,
357                &key,
358                &request.params,
359                &StoredResponse { status, envelope },
360            );
361        }
362
363        result
364    }
365
366    fn replay_stored_response(
367        &self,
368        request: &RpcRequestEnvelope,
369        response: StoredResponse,
370    ) -> Result<Value, ApiError> {
371        if let Some(result) = response.envelope.get("result") {
372            return Ok(result.clone());
373        }
374
375        let error_body = response
376            .envelope
377            .get("error")
378            .and_then(Value::as_object)
379            .ok_or_else(|| {
380                ApiError::internal("stored idempotency response was missing an error payload")
381                    .with_context(request.id.clone(), Some(request.method.clone()))
382            })?;
383
384        let code = error_body
385            .get("code")
386            .and_then(Value::as_str)
387            .and_then(RpcErrorCode::from_contract)
388            .unwrap_or(RpcErrorCode::Internal);
389        let message = error_body
390            .get("message")
391            .and_then(Value::as_str)
392            .unwrap_or("stored idempotency replay failed");
393        let retryable = error_body
394            .get("retryable")
395            .and_then(Value::as_bool)
396            .unwrap_or(false);
397
398        let mut error = ApiError::new(code, message)
399            .with_context(request.id.clone(), Some(request.method.clone()));
400        error.retryable = retryable;
401        error.details = error_body.get("details").cloned();
402        error.status = StatusCode::from_u16(response.status).unwrap_or(error.status);
403        Err(error)
404    }
405}