Skip to main content

reifydb_sub_server_http/
handlers.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2025 ReifyDB
3
4//! HTTP endpoint handler for query and command execution.
5//!
6//! This module provides the request handler for:
7//! - `/health` - Health check endpoint
8//! - `/v1/query` - Execute read-only queries
9//! - `/v1/command` - Execute write commands
10
11use axum::{
12	Json,
13	extract::State,
14	http::{HeaderMap, StatusCode},
15	response::IntoResponse,
16};
17use reifydb_sub_server::{
18	auth::{AuthError, extract_identity_from_api_key, extract_identity_from_auth_header},
19	execute::{execute_admin, execute_command, execute_query},
20	response::{ResponseFrame, convert_frames},
21	state::AppState,
22	wire::WireParams,
23};
24use reifydb_type::{params::Params, value::identity::IdentityId};
25use serde::{Deserialize, Serialize};
26
27use crate::error::AppError;
28
29/// Request body for query and command endpoints.
30#[derive(Debug, Deserialize)]
31pub struct StatementRequest {
32	/// One or more RQL statements to execute.
33	pub statements: Vec<String>,
34	/// Optional query parameters.
35	#[serde(default)]
36	pub params: Option<WireParams>,
37}
38
39/// Response body for query and command endpoints.
40#[derive(Debug, Serialize)]
41pub struct QueryResponse {
42	/// Result frames from query execution.
43	pub frames: Vec<ResponseFrame>,
44}
45
46/// Health check response.
47#[derive(Debug, Serialize)]
48pub struct HealthResponse {
49	pub status: &'static str,
50}
51
52/// Health check endpoint.
53///
54/// Returns 200 OK if the server is running.
55/// This endpoint does not require authentication.
56///
57/// # Response
58///
59/// ```json
60/// {"status": "ok"}
61/// ```
62pub async fn health() -> impl IntoResponse {
63	(
64		StatusCode::OK,
65		Json(HealthResponse {
66			status: "ok",
67		}),
68	)
69}
70
71/// Execute a read-only query.
72///
73/// # Authentication
74///
75/// Requires one of:
76/// - `Authorization: Bearer <token>` header
77/// - `X-Api-Key: <key>` header
78///
79/// # Request Body
80///
81/// ```json
82/// {
83///   "statements": ["FROM users FILTER id = $1"],
84///   "params": {"$1": 42}
85/// }
86/// ```
87///
88/// # Response
89///
90/// ```json
91/// {
92///   "frames": [...]
93/// }
94/// ```
95pub async fn handle_query(
96	State(state): State<AppState>,
97	headers: HeaderMap,
98	Json(request): Json<StatementRequest>,
99) -> Result<Json<QueryResponse>, AppError> {
100	// Extract identity from headers
101	let identity = extract_identity(&headers)?;
102
103	// Combine statements
104	let query = request.statements.join("; ");
105
106	// Get params or default
107	let params = match request.params {
108		None => Params::None,
109		Some(wp) => wp.into_params().map_err(|e| AppError::InvalidParams(e))?,
110	};
111
112	// Execute with timeout
113	let frames = execute_query(
114		state.actor_system(),
115		state.engine_clone(),
116		query,
117		identity,
118		params,
119		state.query_timeout(),
120	)
121	.await?;
122
123	Ok(Json(QueryResponse {
124		frames: convert_frames(frames),
125	}))
126}
127
128/// Execute an admin operation.
129///
130/// Admin operations include DDL (CREATE TABLE, ALTER, etc.), DML (INSERT, UPDATE, DELETE),
131/// and read queries. This is the most privileged execution level.
132///
133/// # Authentication
134///
135/// Requires one of:
136/// - `Authorization: Bearer <token>` header
137/// - `X-Api-Key: <key>` header
138pub async fn handle_admin(
139	State(state): State<AppState>,
140	headers: HeaderMap,
141	Json(request): Json<StatementRequest>,
142) -> Result<Json<QueryResponse>, AppError> {
143	// Extract identity from headers
144	let identity = extract_identity(&headers)?;
145
146	// Get params or default
147	let params = match request.params {
148		None => Params::None,
149		Some(wp) => wp.into_params().map_err(|e| AppError::InvalidParams(e))?,
150	};
151
152	// Execute with timeout
153	let frames = execute_admin(
154		state.actor_system(),
155		state.engine_clone(),
156		request.statements,
157		identity,
158		params,
159		state.query_timeout(),
160	)
161	.await?;
162
163	Ok(Json(QueryResponse {
164		frames: convert_frames(frames),
165	}))
166}
167
168/// Execute a write command.
169///
170/// Commands include INSERT, UPDATE, and DELETE statements.
171///
172/// # Authentication
173///
174/// Requires one of:
175/// - `Authorization: Bearer <token>` header
176/// - `X-Api-Key: <key>` header
177///
178/// # Request Body
179///
180/// ```json
181/// {
182///   "statements": ["INSERT INTO users (name) VALUES ($1)"],
183///   "params": {"$1": "Alice"}
184/// }
185/// ```
186///
187/// # Response
188///
189/// ```json
190/// {
191///   "frames": [...]
192/// }
193/// ```
194pub async fn handle_command(
195	State(state): State<AppState>,
196	headers: HeaderMap,
197	Json(request): Json<StatementRequest>,
198) -> Result<Json<QueryResponse>, AppError> {
199	// Extract identity from headers
200	let identity = extract_identity(&headers)?;
201
202	// Get params or default
203	let params = match request.params {
204		None => Params::None,
205		Some(wp) => wp.into_params().map_err(|e| AppError::InvalidParams(e))?,
206	};
207
208	// Execute with timeout
209	let frames = execute_command(
210		state.actor_system(),
211		state.engine_clone(),
212		request.statements,
213		identity,
214		params,
215		state.query_timeout(),
216	)
217	.await?;
218
219	Ok(Json(QueryResponse {
220		frames: convert_frames(frames),
221	}))
222}
223
224/// Extract identity from request headers.
225///
226/// Tries in order:
227/// 1. Authorization header (Bearer token)
228/// 2. X-Api-Key header
229fn extract_identity(headers: &HeaderMap) -> Result<IdentityId, AppError> {
230	// Try Authorization header first
231	if let Some(auth_header) = headers.get("authorization") {
232		let auth_str = auth_header.to_str().map_err(|_| AppError::Auth(AuthError::InvalidHeader))?;
233
234		return extract_identity_from_auth_header(auth_str).map_err(AppError::Auth);
235	}
236
237	// Try X-Api-Key header
238	if let Some(api_key) = headers.get("x-api-key") {
239		let key = api_key.to_str().map_err(|_| AppError::Auth(AuthError::InvalidHeader))?;
240
241		return extract_identity_from_api_key(key).map_err(AppError::Auth);
242	}
243
244	// No credentials provided
245	Err(AppError::Auth(AuthError::MissingCredentials))
246}
247
248#[cfg(test)]
249pub mod tests {
250	use serde_json::{from_str, to_string};
251
252	use super::*;
253
254	#[test]
255	fn test_statement_request_deserialization() {
256		let json = r#"{"statements": ["SELECT 1"]}"#;
257		let request: StatementRequest = from_str(json).unwrap();
258		assert_eq!(request.statements, vec!["SELECT 1"]);
259		assert!(request.params.is_none());
260	}
261
262	#[test]
263	fn test_query_response_serialization() {
264		let response = QueryResponse {
265			frames: Vec::new(),
266		};
267		let json = to_string(&response).unwrap();
268		assert!(json.contains("frames"));
269	}
270
271	#[test]
272	fn test_health_response_serialization() {
273		let response = HealthResponse {
274			status: "ok",
275		};
276		let json = to_string(&response).unwrap();
277		assert_eq!(json, r#"{"status":"ok"}"#);
278	}
279}