reifydb_sub_server/
execute.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later
3
4//! Query and command execution with async/sync bridging.
5//!
6//! This module provides async wrappers around the synchronous database engine
7//! operations using `spawn_blocking` to avoid blocking tokio worker threads.
8//!
9//! All database operations are executed on tokio's blocking thread pool with
10//! configurable timeouts.
11
12use std::time::Duration;
13
14use reifydb_core::{
15	Frame,
16	interface::{Engine, Identity},
17};
18use reifydb_engine::StandardEngine;
19use reifydb_type::Params;
20use tokio::task::spawn_blocking;
21
22/// Error types for query/command execution.
23#[derive(Debug)]
24pub enum ExecuteError {
25	/// Query exceeded the configured timeout.
26	Timeout,
27	/// The blocking task panicked during execution.
28	TaskPanic(String),
29	/// Database engine returned an error, with the statement that caused it.
30	Engine {
31		error: reifydb_type::Error,
32		statement: String,
33	},
34}
35
36impl std::fmt::Display for ExecuteError {
37	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38		match self {
39			ExecuteError::Timeout => write!(f, "Query execution timed out"),
40			ExecuteError::TaskPanic(msg) => write!(f, "Query task panicked: {}", msg),
41			ExecuteError::Engine {
42				error,
43				..
44			} => write!(f, "Engine error: {}", error),
45		}
46	}
47}
48
49impl std::error::Error for ExecuteError {
50	fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
51		match self {
52			ExecuteError::Engine {
53				error,
54				..
55			} => Some(error),
56			_ => None,
57		}
58	}
59}
60
61/// Result type for execute operations.
62pub type ExecuteResult<T> = std::result::Result<T, ExecuteError>;
63
64/// Execute a query on the blocking thread pool with timeout.
65///
66/// This function:
67/// 1. Spawns the query execution on tokio's blocking thread pool
68/// 2. Applies a timeout to prevent stuck queries from hanging indefinitely
69/// 3. Returns the query results or an appropriate error
70///
71/// # Arguments
72///
73/// * `engine` - The database engine to execute the query on
74/// * `query` - The RQL query string
75/// * `identity` - The identity context for permission checking
76/// * `params` - Query parameters
77/// * `timeout` - Maximum time to wait for query completion
78///
79/// # Returns
80///
81/// * `Ok(Vec<Frame>)` - Query results on success
82/// * `Err(ExecuteError::Timeout)` - If the query exceeds the timeout
83/// * `Err(ExecuteError::TaskPanic)` - If the blocking task panics
84/// * `Err(ExecuteError::Engine)` - If the engine returns an error
85///
86/// # Example
87///
88/// ```ignore
89/// let result = execute_query(
90///     engine,
91///     "SELECT * FROM users".to_string(),
92///     identity,
93///     Params::None,
94///     Duration::from_secs(30),
95/// ).await?;
96/// ```
97pub async fn execute_query(
98	engine: StandardEngine,
99	query: String,
100	identity: Identity,
101	params: Params,
102	timeout: Duration,
103) -> ExecuteResult<Vec<Frame>> {
104	let query_clone = query.clone();
105	let result =
106		tokio::time::timeout(timeout, spawn_blocking(move || engine.query_as(&identity, &query, params))).await;
107
108	match result {
109		// Timeout expired
110		Err(_elapsed) => Err(ExecuteError::Timeout),
111		// spawn_blocking returned
112		Ok(join_result) => match join_result {
113			// Task panicked
114			Err(join_err) => Err(ExecuteError::TaskPanic(join_err.to_string())),
115			// Task completed
116			Ok(engine_result) => engine_result.map_err(|e| ExecuteError::Engine {
117				error: e,
118				statement: query_clone,
119			}),
120		},
121	}
122}
123
124/// Execute a command on the blocking thread pool with timeout.
125///
126/// Commands are write operations (INSERT, UPDATE, DELETE, DDL) that modify
127/// the database state.
128///
129/// # Arguments
130///
131/// * `engine` - The database engine to execute the command on
132/// * `statements` - The RQL command statements
133/// * `identity` - The identity context for permission checking
134/// * `params` - Command parameters
135/// * `timeout` - Maximum time to wait for command completion
136///
137/// # Returns
138///
139/// * `Ok(Vec<Frame>)` - Command results on success
140/// * `Err(ExecuteError::Timeout)` - If the command exceeds the timeout
141/// * `Err(ExecuteError::TaskPanic)` - If the blocking task panics
142/// * `Err(ExecuteError::Engine)` - If the engine returns an error
143pub async fn execute_command(
144	engine: StandardEngine,
145	statements: Vec<String>,
146	identity: Identity,
147	params: Params,
148	timeout: Duration,
149) -> ExecuteResult<Vec<Frame>> {
150	let combined = statements.join("; ");
151	let combined_clone = combined.clone();
152	let result =
153		tokio::time::timeout(timeout, spawn_blocking(move || engine.command_as(&identity, &combined, params)))
154			.await;
155
156	match result {
157		// Timeout expired
158		Err(_elapsed) => Err(ExecuteError::Timeout),
159		// spawn_blocking returned
160		Ok(join_result) => match join_result {
161			// Task panicked
162			Err(join_err) => Err(ExecuteError::TaskPanic(join_err.to_string())),
163			// Task completed
164			Ok(engine_result) => engine_result.map_err(|e| ExecuteError::Engine {
165				error: e,
166				statement: combined_clone,
167			}),
168		},
169	}
170}
171
172/// Execute a single query statement on the blocking thread pool with timeout.
173///
174/// This is a convenience wrapper around `execute_query` for single statements.
175pub async fn execute_query_single(
176	engine: StandardEngine,
177	query: &str,
178	identity: Identity,
179	params: Params,
180	timeout: Duration,
181) -> ExecuteResult<Vec<Frame>> {
182	execute_query(engine, query.to_string(), identity, params, timeout).await
183}
184
185/// Execute a single command statement on the blocking thread pool with timeout.
186///
187/// This is a convenience wrapper around `execute_command` for single statements.
188pub async fn execute_command_single(
189	engine: StandardEngine,
190	command: &str,
191	identity: Identity,
192	params: Params,
193	timeout: Duration,
194) -> ExecuteResult<Vec<Frame>> {
195	execute_command(engine, vec![command.to_string()], identity, params, timeout).await
196}
197
198#[cfg(test)]
199mod tests {
200	use super::*;
201
202	#[test]
203	fn test_execute_error_display() {
204		let timeout_err = ExecuteError::Timeout;
205		assert_eq!(timeout_err.to_string(), "Query execution timed out");
206
207		let panic_err = ExecuteError::TaskPanic("test panic".to_string());
208		assert_eq!(panic_err.to_string(), "Query task panicked: test panic");
209	}
210}