reifydb_sub_server_http/
handlers.rs1use axum::{
12 Json,
13 extract::State,
14 http::{HeaderMap, StatusCode},
15 response::IntoResponse,
16};
17use reifydb_sub_server::{
18 auth::{AuthError, extract_identity_from_api_key, extract_identity_from_auth_header},
19 execute::{execute_admin, execute_command, execute_query},
20 response::{ResponseFrame, convert_frames},
21 state::AppState,
22 wire::WireParams,
23};
24use reifydb_type::{params::Params, value::identity::IdentityId};
25use serde::{Deserialize, Serialize};
26
27use crate::error::AppError;
28
29#[derive(Debug, Deserialize)]
31pub struct StatementRequest {
32 pub statements: Vec<String>,
34 #[serde(default)]
36 pub params: Option<WireParams>,
37}
38
39#[derive(Debug, Serialize)]
41pub struct QueryResponse {
42 pub frames: Vec<ResponseFrame>,
44}
45
46#[derive(Debug, Serialize)]
48pub struct HealthResponse {
49 pub status: &'static str,
50}
51
52pub async fn health() -> impl IntoResponse {
63 (
64 StatusCode::OK,
65 Json(HealthResponse {
66 status: "ok",
67 }),
68 )
69}
70
71pub async fn handle_query(
96 State(state): State<AppState>,
97 headers: HeaderMap,
98 Json(request): Json<StatementRequest>,
99) -> Result<Json<QueryResponse>, AppError> {
100 let identity = extract_identity(&headers)?;
102
103 let query = request.statements.join("; ");
105
106 let params = match request.params {
108 None => Params::None,
109 Some(wp) => wp.into_params().map_err(|e| AppError::InvalidParams(e))?,
110 };
111
112 let frames = execute_query(
114 state.actor_system(),
115 state.engine_clone(),
116 query,
117 identity,
118 params,
119 state.query_timeout(),
120 )
121 .await?;
122
123 Ok(Json(QueryResponse {
124 frames: convert_frames(frames),
125 }))
126}
127
128pub async fn handle_admin(
139 State(state): State<AppState>,
140 headers: HeaderMap,
141 Json(request): Json<StatementRequest>,
142) -> Result<Json<QueryResponse>, AppError> {
143 let identity = extract_identity(&headers)?;
145
146 let params = match request.params {
148 None => Params::None,
149 Some(wp) => wp.into_params().map_err(|e| AppError::InvalidParams(e))?,
150 };
151
152 let frames = execute_admin(
154 state.actor_system(),
155 state.engine_clone(),
156 request.statements,
157 identity,
158 params,
159 state.query_timeout(),
160 )
161 .await?;
162
163 Ok(Json(QueryResponse {
164 frames: convert_frames(frames),
165 }))
166}
167
168pub async fn handle_command(
195 State(state): State<AppState>,
196 headers: HeaderMap,
197 Json(request): Json<StatementRequest>,
198) -> Result<Json<QueryResponse>, AppError> {
199 let identity = extract_identity(&headers)?;
201
202 let params = match request.params {
204 None => Params::None,
205 Some(wp) => wp.into_params().map_err(|e| AppError::InvalidParams(e))?,
206 };
207
208 let frames = execute_command(
210 state.actor_system(),
211 state.engine_clone(),
212 request.statements,
213 identity,
214 params,
215 state.query_timeout(),
216 )
217 .await?;
218
219 Ok(Json(QueryResponse {
220 frames: convert_frames(frames),
221 }))
222}
223
224fn extract_identity(headers: &HeaderMap) -> Result<IdentityId, AppError> {
230 if let Some(auth_header) = headers.get("authorization") {
232 let auth_str = auth_header.to_str().map_err(|_| AppError::Auth(AuthError::InvalidHeader))?;
233
234 return extract_identity_from_auth_header(auth_str).map_err(AppError::Auth);
235 }
236
237 if let Some(api_key) = headers.get("x-api-key") {
239 let key = api_key.to_str().map_err(|_| AppError::Auth(AuthError::InvalidHeader))?;
240
241 return extract_identity_from_api_key(key).map_err(AppError::Auth);
242 }
243
244 Err(AppError::Auth(AuthError::MissingCredentials))
246}
247
248#[cfg(test)]
249pub mod tests {
250 use serde_json::{from_str, to_string};
251
252 use super::*;
253
254 #[test]
255 fn test_statement_request_deserialization() {
256 let json = r#"{"statements": ["SELECT 1"]}"#;
257 let request: StatementRequest = from_str(json).unwrap();
258 assert_eq!(request.statements, vec!["SELECT 1"]);
259 assert!(request.params.is_none());
260 }
261
262 #[test]
263 fn test_query_response_serialization() {
264 let response = QueryResponse {
265 frames: Vec::new(),
266 };
267 let json = to_string(&response).unwrap();
268 assert!(json.contains("frames"));
269 }
270
271 #[test]
272 fn test_health_response_serialization() {
273 let response = HealthResponse {
274 status: "ok",
275 };
276 let json = to_string(&response).unwrap();
277 assert_eq!(json, r#"{"status":"ok"}"#);
278 }
279}