Skip to main content

ralph_api/
runtime.rs

1mod dispatch;
2
3use std::sync::{Arc, Mutex, MutexGuard};
4use std::time::Duration;
5
6use axum::http::{HeaderMap, StatusCode};
7use serde::Deserialize;
8use serde::de::DeserializeOwned;
9use serde_json::{Value, json};
10use tracing::debug;
11
12use crate::auth::{Authenticator, from_config};
13use crate::collection_domain::CollectionDomain;
14use crate::config::ApiConfig;
15use crate::config_domain::ConfigDomain;
16use crate::errors::{ApiError, RpcErrorCode};
17use crate::idempotency::{
18    IdempotencyCheck, IdempotencyStore, InMemoryIdempotencyStore, StoredResponse,
19};
20use crate::loop_domain::LoopDomain;
21use crate::planning_domain::PlanningDomain;
22use crate::preset_domain::PresetDomain;
23use crate::protocol::{
24    API_VERSION, KNOWN_METHODS, RpcRequestEnvelope, STREAM_TOPICS, error_envelope, is_known_method,
25    is_mutating_method, parse_json_value, parse_request, request_context, success_envelope,
26    validate_request_schema,
27};
28use crate::stream_domain::StreamDomain;
29use crate::task_domain::TaskDomain;
30
31#[derive(Debug, Clone, Deserialize)]
32#[serde(rename_all = "camelCase")]
33pub(crate) struct IdOnlyParams {
34    pub(crate) id: String,
35}
36
37#[derive(Clone)]
38pub struct RpcRuntime {
39    pub(crate) config: ApiConfig,
40    auth: Arc<dyn Authenticator>,
41    idempotency: Arc<dyn IdempotencyStore>,
42    tasks: Arc<Mutex<TaskDomain>>,
43    loops: Arc<Mutex<LoopDomain>>,
44    planning: Arc<Mutex<PlanningDomain>>,
45    collections: Arc<Mutex<CollectionDomain>>,
46    streams: StreamDomain,
47    config_domain: ConfigDomain,
48    preset_domain: PresetDomain,
49}
50
51enum ExecutionOutcome {
52    Fresh(Value),
53    Replay(StoredResponse),
54}
55
56impl RpcRuntime {
57    pub fn new(config: ApiConfig) -> anyhow::Result<Self> {
58        config.validate()?;
59
60        let auth = from_config(&config)?;
61        let idempotency = Arc::new(InMemoryIdempotencyStore::new(Duration::from_secs(
62            config.idempotency_ttl_secs,
63        )));
64
65        Ok(Self::with_components(config, auth, idempotency))
66    }
67
68    pub fn with_components(
69        config: ApiConfig,
70        auth: Arc<dyn Authenticator>,
71        idempotency: Arc<dyn IdempotencyStore>,
72    ) -> Self {
73        let tasks = Arc::new(Mutex::new(TaskDomain::new(&config.workspace_root)));
74        let loops = Arc::new(Mutex::new(LoopDomain::new(
75            &config.workspace_root,
76            config.loop_process_interval_ms,
77            config.ralph_command.clone(),
78        )));
79        let planning = Arc::new(Mutex::new(PlanningDomain::new(&config.workspace_root)));
80        let collections = Arc::new(Mutex::new(CollectionDomain::new(&config.workspace_root)));
81        let streams = StreamDomain::new();
82        let config_domain = ConfigDomain::new(&config.workspace_root);
83        let preset_domain = PresetDomain::new(&config.workspace_root);
84
85        Self {
86            config,
87            auth,
88            idempotency,
89            tasks,
90            loops,
91            planning,
92            collections,
93            streams,
94            config_domain,
95            preset_domain,
96        }
97    }
98
99    pub fn health_payload(&self) -> Value {
100        json!({
101            "status": "ok",
102            "timestamp": crate::loop_support::now_ts()
103        })
104    }
105
106    pub fn capabilities_payload(&self) -> Value {
107        json!({
108            "methods": KNOWN_METHODS,
109            "streamTopics": STREAM_TOPICS,
110            "auth": {
111                "mode": self.auth.mode().as_contract_mode(),
112                "supportedModes": ["trusted_local", "token"]
113            },
114            "idempotency": {
115                "requiredForMutations": true,
116                "retentionSeconds": self.config.idempotency_ttl_secs
117            }
118        })
119    }
120
121    pub fn invoke_method(
122        &self,
123        request_id: impl Into<String>,
124        method: &str,
125        params: Value,
126        principal: &str,
127        idempotency_key: Option<String>,
128    ) -> Result<Value, ApiError> {
129        let request_id = request_id.into();
130        let mut raw = json!({
131            "apiVersion": API_VERSION,
132            "id": request_id,
133            "method": method,
134            "params": params,
135        });
136
137        if let Some(idempotency_key) = idempotency_key {
138            raw["meta"] = json!({
139                "idempotencyKey": idempotency_key,
140            });
141        }
142
143        let request = self.parse_and_validate_request_value(raw)?;
144        match self.execute_request(&request, principal)? {
145            ExecutionOutcome::Fresh(result) => Ok(result),
146            ExecutionOutcome::Replay(response) => self.replay_stored_response(&request, response),
147        }
148    }
149
150    pub fn handle_http_request(&self, body: &[u8], headers: &HeaderMap) -> (StatusCode, Value) {
151        let request = match self.parse_and_validate_request(body) {
152            Ok(request) => request,
153            Err(error) => {
154                let status = error.status;
155                let envelope = error_envelope(&error, &self.config.served_by);
156                return (status, envelope);
157            }
158        };
159
160        let principal =
161            match self.auth.authorize(&request, headers).map_err(|error| {
162                error.with_context(request.id.clone(), Some(request.method.clone()))
163            }) {
164                Ok(p) => p,
165                Err(error) => {
166                    let status = error.status;
167                    let envelope = error_envelope(&error, &self.config.served_by);
168                    return (status, envelope);
169                }
170            };
171
172        let (status, envelope) = match self.execute_request(&request, &principal) {
173            Ok(ExecutionOutcome::Fresh(result)) => (
174                StatusCode::OK,
175                success_envelope(&request, result, &self.config.served_by),
176            ),
177            Ok(ExecutionOutcome::Replay(response)) => (
178                StatusCode::from_u16(response.status).unwrap_or(StatusCode::OK),
179                response.envelope,
180            ),
181            Err(error) => {
182                let status = error.status;
183                let envelope = error_envelope(&error, &self.config.served_by);
184                (status, envelope)
185            }
186        };
187
188        (status, envelope)
189    }
190
191    pub fn authenticate_websocket(&self, headers: &HeaderMap) -> Result<String, ApiError> {
192        let dummy_request = crate::protocol::RpcRequestEnvelope {
193            api_version: "v1".to_string(),
194            id: "ws-upgrade".to_string(),
195            method: "stream.subscribe".to_string(),
196            params: serde_json::Value::Object(serde_json::Map::new()),
197            meta: None,
198        };
199
200        self.auth
201            .authorize(&dummy_request, headers)
202            .map_err(|error| error.with_context("ws-upgrade", Some("stream.subscribe".to_string())))
203    }
204
205    pub(crate) fn task_domain_mut(&self) -> Result<MutexGuard<'_, TaskDomain>, ApiError> {
206        self.tasks
207            .lock()
208            .map_err(|_| ApiError::internal("task domain lock poisoned"))
209    }
210
211    pub(crate) fn loop_domain_mut(&self) -> Result<MutexGuard<'_, LoopDomain>, ApiError> {
212        self.loops
213            .lock()
214            .map_err(|_| ApiError::internal("loop domain lock poisoned"))
215    }
216
217    pub(crate) fn planning_domain_mut(&self) -> Result<MutexGuard<'_, PlanningDomain>, ApiError> {
218        self.planning
219            .lock()
220            .map_err(|_| ApiError::internal("planning domain lock poisoned"))
221    }
222
223    pub(crate) fn collection_domain_mut(
224        &self,
225    ) -> Result<MutexGuard<'_, CollectionDomain>, ApiError> {
226        self.collections
227            .lock()
228            .map_err(|_| ApiError::internal("collection domain lock poisoned"))
229    }
230
231    pub(crate) fn stream_domain(&self) -> StreamDomain {
232        self.streams.clone()
233    }
234
235    pub(crate) fn config_domain(&self) -> &ConfigDomain {
236        &self.config_domain
237    }
238
239    pub(crate) fn preset_domain(&self) -> &PresetDomain {
240        &self.preset_domain
241    }
242
243    pub(crate) fn parse_params<T>(&self, request: &RpcRequestEnvelope) -> Result<T, ApiError>
244    where
245        T: DeserializeOwned,
246    {
247        serde_json::from_value(request.params.clone()).map_err(|error| {
248            ApiError::invalid_params(format!(
249                "invalid params for method '{}': {error}",
250                request.method
251            ))
252        })
253    }
254
255    fn parse_and_validate_request(&self, body: &[u8]) -> Result<RpcRequestEnvelope, ApiError> {
256        let raw = parse_json_value(body)?;
257        self.parse_and_validate_request_value(raw)
258    }
259
260    fn parse_and_validate_request_value(&self, raw: Value) -> Result<RpcRequestEnvelope, ApiError> {
261        let (request_id, method) = request_context(&raw);
262
263        if !raw.is_object() {
264            return Err(
265                ApiError::invalid_request("request body must be a JSON object")
266                    .with_context(request_id, method),
267            );
268        }
269
270        let method = method.ok_or_else(|| {
271            ApiError::invalid_request("missing required field 'method'")
272                .with_context(request_id.clone(), None)
273        })?;
274
275        if !is_known_method(&method) {
276            return Err(
277                ApiError::method_not_found(method.clone()).with_context(request_id, Some(method))
278            );
279        }
280
281        if let Err(errors) = validate_request_schema(&raw) {
282            return Err(
283                ApiError::invalid_params("request does not match rpc-v1 schema")
284                    .with_context(request_id, Some(method))
285                    .with_details(json!({ "errors": errors })),
286            );
287        }
288
289        let request = parse_request(&raw)
290            .map_err(|error| error.with_context(request_id.clone(), Some(method.clone())))?;
291
292        if request.api_version != API_VERSION {
293            return Err(ApiError::invalid_request(format!(
294                "unsupported apiVersion '{}'; expected '{API_VERSION}'",
295                request.api_version
296            ))
297            .with_context(request.id, Some(request.method)));
298        }
299
300        Ok(request)
301    }
302
303    fn execute_request(
304        &self,
305        request: &RpcRequestEnvelope,
306        principal: &str,
307    ) -> Result<ExecutionOutcome, ApiError> {
308        let mut idempotency_context: Option<String> = None;
309        if is_mutating_method(&request.method) {
310            let key = match request
311                .meta
312                .as_ref()
313                .and_then(|meta| meta.idempotency_key.as_deref())
314            {
315                Some(key) => key,
316                None => {
317                    return Err(ApiError::invalid_params(
318                        "mutating methods require meta.idempotencyKey",
319                    )
320                    .with_context(request.id.clone(), Some(request.method.clone())));
321                }
322            };
323
324            match self
325                .idempotency
326                .check(&request.method, key, &request.params)
327            {
328                IdempotencyCheck::Replay(response) => {
329                    debug!(
330                        method = %request.method,
331                        request_id = %request.id,
332                        "idempotency replay"
333                    );
334                    return Ok(ExecutionOutcome::Replay(response));
335                }
336                IdempotencyCheck::Conflict => {
337                    return Err(ApiError::idempotency_conflict(
338                        "idempotency key was already used with different parameters",
339                    )
340                    .with_context(request.id.clone(), Some(request.method.clone()))
341                    .with_details(json!({
342                        "method": request.method.clone(),
343                        "idempotencyKey": key
344                    })));
345                }
346                IdempotencyCheck::New => {
347                    idempotency_context = Some(key.to_string());
348                }
349            }
350        }
351
352        let result = self
353            .dispatch(request, principal)
354            .map_err(|error| error.with_context(request.id.clone(), Some(request.method.clone())));
355
356        if let Some(key) = idempotency_context {
357            let (status, envelope) = match &result {
358                Ok(value) => (
359                    StatusCode::OK.as_u16(),
360                    success_envelope(request, value.clone(), &self.config.served_by),
361                ),
362                Err(error) => (
363                    error.status.as_u16(),
364                    error_envelope(error, &self.config.served_by),
365                ),
366            };
367            self.idempotency.store(
368                &request.method,
369                &key,
370                &request.params,
371                &StoredResponse { status, envelope },
372            );
373        }
374
375        result.map(ExecutionOutcome::Fresh)
376    }
377
378    fn replay_stored_response(
379        &self,
380        request: &RpcRequestEnvelope,
381        response: StoredResponse,
382    ) -> Result<Value, ApiError> {
383        if let Some(result) = response.envelope.get("result") {
384            return Ok(result.clone());
385        }
386
387        let error_body = response
388            .envelope
389            .get("error")
390            .and_then(Value::as_object)
391            .ok_or_else(|| {
392                ApiError::internal("stored idempotency response was missing an error payload")
393                    .with_context(request.id.clone(), Some(request.method.clone()))
394            })?;
395
396        let code = error_body
397            .get("code")
398            .and_then(Value::as_str)
399            .and_then(RpcErrorCode::from_contract)
400            .unwrap_or(RpcErrorCode::Internal);
401        let message = error_body
402            .get("message")
403            .and_then(Value::as_str)
404            .unwrap_or("stored idempotency replay failed");
405        let retryable = error_body
406            .get("retryable")
407            .and_then(Value::as_bool)
408            .unwrap_or(false);
409
410        let mut error = ApiError::new(code, message)
411            .with_context(request.id.clone(), Some(request.method.clone()));
412        error.retryable = retryable;
413        error.details = error_body.get("details").cloned();
414        error.status = StatusCode::from_u16(response.status).unwrap_or(error.status);
415        Err(error)
416    }
417}