reifydb_sub_server/
execute.rs1use std::time::Duration;
13
14use reifydb_core::{
15 Frame,
16 interface::{Engine, Identity},
17};
18use reifydb_engine::StandardEngine;
19use reifydb_type::Params;
20use tokio::task::spawn_blocking;
21
22#[derive(Debug)]
24pub enum ExecuteError {
25 Timeout,
27 TaskPanic(String),
29 Engine {
31 error: reifydb_type::Error,
32 statement: String,
33 },
34}
35
36impl std::fmt::Display for ExecuteError {
37 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38 match self {
39 ExecuteError::Timeout => write!(f, "Query execution timed out"),
40 ExecuteError::TaskPanic(msg) => write!(f, "Query task panicked: {}", msg),
41 ExecuteError::Engine {
42 error,
43 ..
44 } => write!(f, "Engine error: {}", error),
45 }
46 }
47}
48
49impl std::error::Error for ExecuteError {
50 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
51 match self {
52 ExecuteError::Engine {
53 error,
54 ..
55 } => Some(error),
56 _ => None,
57 }
58 }
59}
60
61pub type ExecuteResult<T> = std::result::Result<T, ExecuteError>;
63
64pub async fn execute_query(
98 engine: StandardEngine,
99 query: String,
100 identity: Identity,
101 params: Params,
102 timeout: Duration,
103) -> ExecuteResult<Vec<Frame>> {
104 let query_clone = query.clone();
105 let result =
106 tokio::time::timeout(timeout, spawn_blocking(move || engine.query_as(&identity, &query, params))).await;
107
108 match result {
109 Err(_elapsed) => Err(ExecuteError::Timeout),
111 Ok(join_result) => match join_result {
113 Err(join_err) => Err(ExecuteError::TaskPanic(join_err.to_string())),
115 Ok(engine_result) => engine_result.map_err(|e| ExecuteError::Engine {
117 error: e,
118 statement: query_clone,
119 }),
120 },
121 }
122}
123
124pub async fn execute_command(
144 engine: StandardEngine,
145 statements: Vec<String>,
146 identity: Identity,
147 params: Params,
148 timeout: Duration,
149) -> ExecuteResult<Vec<Frame>> {
150 let combined = statements.join("; ");
151 let combined_clone = combined.clone();
152 let result =
153 tokio::time::timeout(timeout, spawn_blocking(move || engine.command_as(&identity, &combined, params)))
154 .await;
155
156 match result {
157 Err(_elapsed) => Err(ExecuteError::Timeout),
159 Ok(join_result) => match join_result {
161 Err(join_err) => Err(ExecuteError::TaskPanic(join_err.to_string())),
163 Ok(engine_result) => engine_result.map_err(|e| ExecuteError::Engine {
165 error: e,
166 statement: combined_clone,
167 }),
168 },
169 }
170}
171
172pub async fn execute_query_single(
176 engine: StandardEngine,
177 query: &str,
178 identity: Identity,
179 params: Params,
180 timeout: Duration,
181) -> ExecuteResult<Vec<Frame>> {
182 execute_query(engine, query.to_string(), identity, params, timeout).await
183}
184
185pub async fn execute_command_single(
189 engine: StandardEngine,
190 command: &str,
191 identity: Identity,
192 params: Params,
193 timeout: Duration,
194) -> ExecuteResult<Vec<Frame>> {
195 execute_command(engine, vec![command.to_string()], identity, params, timeout).await
196}
197
198#[cfg(test)]
199mod tests {
200 use super::*;
201
202 #[test]
203 fn test_execute_error_display() {
204 let timeout_err = ExecuteError::Timeout;
205 assert_eq!(timeout_err.to_string(), "Query execution timed out");
206
207 let panic_err = ExecuteError::TaskPanic("test panic".to_string());
208 assert_eq!(panic_err.to_string(), "Query task panicked: test panic");
209 }
210}