Skip to main content

reifydb_sub_server_http/
handlers.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2025 ReifyDB
3
4//! HTTP endpoint handler for query and command execution.
5//!
6//! This module provides the request handler for:
7//! - `/health` - Health check endpoint
8//! - `/v1/query` - Execute read-only queries
9//! - `/v1/command` - Execute write commands
10
11use axum::{
12	Json,
13	extract::State,
14	http::{HeaderMap, StatusCode},
15	response::IntoResponse,
16};
17use reifydb_sub_server::{
18	auth::{AuthError, extract_identity_from_api_key, extract_identity_from_auth_header},
19	execute::{execute_admin, execute_command, execute_query},
20	response::{ResponseFrame, convert_frames},
21	state::AppState,
22};
23use reifydb_type::params::Params;
24use serde::{Deserialize, Serialize};
25
26use crate::error::AppError;
27
28/// Request body for query and command endpoints.
29#[derive(Debug, Deserialize)]
30pub struct StatementRequest {
31	/// One or more RQL statements to execute.
32	pub statements: Vec<String>,
33	/// Optional query parameters.
34	#[serde(default)]
35	pub params: Option<Params>,
36}
37
38/// Response body for query and command endpoints.
39#[derive(Debug, Serialize)]
40pub struct QueryResponse {
41	/// Result frames from query execution.
42	pub frames: Vec<ResponseFrame>,
43}
44
45/// Health check response.
46#[derive(Debug, Serialize)]
47pub struct HealthResponse {
48	pub status: &'static str,
49}
50
51/// Health check endpoint.
52///
53/// Returns 200 OK if the server is running.
54/// This endpoint does not require authentication.
55///
56/// # Response
57///
58/// ```json
59/// {"status": "ok"}
60/// ```
61pub async fn health() -> impl IntoResponse {
62	(
63		StatusCode::OK,
64		Json(HealthResponse {
65			status: "ok",
66		}),
67	)
68}
69
70/// Execute a read-only query.
71///
72/// # Authentication
73///
74/// Requires one of:
75/// - `Authorization: Bearer <token>` header
76/// - `X-Api-Key: <key>` header
77///
78/// # Request Body
79///
80/// ```json
81/// {
82///   "statements": ["FROM users FILTER id = $1"],
83///   "params": {"$1": 42}
84/// }
85/// ```
86///
87/// # Response
88///
89/// ```json
90/// {
91///   "frames": [...]
92/// }
93/// ```
94pub async fn handle_query(
95	State(state): State<AppState>,
96	headers: HeaderMap,
97	Json(request): Json<StatementRequest>,
98) -> Result<Json<QueryResponse>, AppError> {
99	// Extract identity from headers
100	let identity = extract_identity(&headers)?;
101
102	// Combine statements
103	let query = request.statements.join("; ");
104
105	// Get params or default
106	let params = request.params.unwrap_or(Params::None);
107
108	// Execute with timeout
109	let frames = execute_query(
110		state.actor_system(),
111		state.engine_clone(),
112		query,
113		identity,
114		params,
115		state.query_timeout(),
116	)
117	.await?;
118
119	Ok(Json(QueryResponse {
120		frames: convert_frames(frames),
121	}))
122}
123
124/// Execute an admin operation.
125///
126/// Admin operations include DDL (CREATE TABLE, ALTER, etc.), DML (INSERT, UPDATE, DELETE),
127/// and read queries. This is the most privileged execution level.
128///
129/// # Authentication
130///
131/// Requires one of:
132/// - `Authorization: Bearer <token>` header
133/// - `X-Api-Key: <key>` header
134pub async fn handle_admin(
135	State(state): State<AppState>,
136	headers: HeaderMap,
137	Json(request): Json<StatementRequest>,
138) -> Result<Json<QueryResponse>, AppError> {
139	// Extract identity from headers
140	let identity = extract_identity(&headers)?;
141
142	// Get params or default
143	let params = request.params.unwrap_or(Params::None);
144
145	// Execute with timeout
146	let frames = execute_admin(
147		state.actor_system(),
148		state.engine_clone(),
149		request.statements,
150		identity,
151		params,
152		state.query_timeout(),
153	)
154	.await?;
155
156	Ok(Json(QueryResponse {
157		frames: convert_frames(frames),
158	}))
159}
160
161/// Execute a write command.
162///
163/// Commands include INSERT, UPDATE, and DELETE statements.
164///
165/// # Authentication
166///
167/// Requires one of:
168/// - `Authorization: Bearer <token>` header
169/// - `X-Api-Key: <key>` header
170///
171/// # Request Body
172///
173/// ```json
174/// {
175///   "statements": ["INSERT INTO users (name) VALUES ($1)"],
176///   "params": {"$1": "Alice"}
177/// }
178/// ```
179///
180/// # Response
181///
182/// ```json
183/// {
184///   "frames": [...]
185/// }
186/// ```
187pub async fn handle_command(
188	State(state): State<AppState>,
189	headers: HeaderMap,
190	Json(request): Json<StatementRequest>,
191) -> Result<Json<QueryResponse>, AppError> {
192	// Extract identity from headers
193	let identity = extract_identity(&headers)?;
194
195	// Get params or default
196	let params = request.params.unwrap_or(Params::None);
197
198	// Execute with timeout
199	let frames = execute_command(
200		state.actor_system(),
201		state.engine_clone(),
202		request.statements,
203		identity,
204		params,
205		state.query_timeout(),
206	)
207	.await?;
208
209	Ok(Json(QueryResponse {
210		frames: convert_frames(frames),
211	}))
212}
213
214/// Extract identity from request headers.
215///
216/// Tries in order:
217/// 1. Authorization header (Bearer token)
218/// 2. X-Api-Key header
219fn extract_identity(headers: &HeaderMap) -> Result<reifydb_core::interface::auth::Identity, AppError> {
220	// Try Authorization header first
221	if let Some(auth_header) = headers.get("authorization") {
222		let auth_str = auth_header.to_str().map_err(|_| AppError::Auth(AuthError::InvalidHeader))?;
223
224		return extract_identity_from_auth_header(auth_str).map_err(AppError::Auth);
225	}
226
227	// Try X-Api-Key header
228	if let Some(api_key) = headers.get("x-api-key") {
229		let key = api_key.to_str().map_err(|_| AppError::Auth(AuthError::InvalidHeader))?;
230
231		return extract_identity_from_api_key(key).map_err(AppError::Auth);
232	}
233
234	// No credentials provided
235	Err(AppError::Auth(AuthError::MissingCredentials))
236}
237
238#[cfg(test)]
239pub mod tests {
240	use super::*;
241
242	#[test]
243	fn test_statement_request_deserialization() {
244		let json = r#"{"statements": ["SELECT 1"]}"#;
245		let request: StatementRequest = serde_json::from_str(json).unwrap();
246		assert_eq!(request.statements, vec!["SELECT 1"]);
247		assert!(request.params.is_none());
248	}
249
250	#[test]
251	fn test_query_response_serialization() {
252		let response = QueryResponse {
253			frames: Vec::new(),
254		};
255		let json = serde_json::to_string(&response).unwrap();
256		assert!(json.contains("frames"));
257	}
258
259	#[test]
260	fn test_health_response_serialization() {
261		let response = HealthResponse {
262			status: "ok",
263		};
264		let json = serde_json::to_string(&response).unwrap();
265		assert_eq!(json, r#"{"status":"ok"}"#);
266	}
267}