reifydb_sub_server/
execute.rs1use std::{sync::Arc, time::Duration};
11
12use futures_util::TryStreamExt;
13use reifydb_core::{
14 Frame,
15 interface::{Engine, Identity},
16 stream::StreamError,
17};
18use reifydb_engine::StandardEngine;
19use reifydb_type::{Params, diagnostic::Diagnostic};
20
21#[derive(Debug)]
23pub enum ExecuteError {
24 Timeout,
26 Cancelled,
28 Disconnected,
30 Engine {
32 diagnostic: Arc<Diagnostic>,
34 statement: String,
36 },
37}
38
39impl std::fmt::Display for ExecuteError {
40 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
41 match self {
42 ExecuteError::Timeout => write!(f, "Query execution timed out"),
43 ExecuteError::Cancelled => write!(f, "Query was cancelled"),
44 ExecuteError::Disconnected => write!(f, "Query stream disconnected unexpectedly"),
45 ExecuteError::Engine {
46 diagnostic,
47 ..
48 } => write!(f, "Engine error: {}", diagnostic.message),
49 }
50 }
51}
52
53impl std::error::Error for ExecuteError {}
54
55impl From<StreamError> for ExecuteError {
56 fn from(err: StreamError) -> Self {
57 match err {
58 StreamError::Query {
59 diagnostic,
60 statement,
61 } => ExecuteError::Engine {
62 diagnostic, statement: statement.unwrap_or_default(),
64 },
65 StreamError::Cancelled => ExecuteError::Cancelled,
66 StreamError::Timeout => ExecuteError::Timeout,
67 StreamError::Disconnected => ExecuteError::Disconnected,
68 }
69 }
70}
71
72pub type ExecuteResult<T> = std::result::Result<T, ExecuteError>;
74
75pub async fn execute_query(
109 engine: StandardEngine,
110 query: String,
111 identity: Identity,
112 params: Params,
113 timeout: Duration,
114) -> ExecuteResult<Vec<Frame>> {
115 let stream = engine.query_as(&identity, &query, params);
116
117 let result = tokio::time::timeout(timeout, stream.try_collect::<Vec<Frame>>()).await;
119
120 match result {
121 Err(_elapsed) => Err(ExecuteError::Timeout),
122 Ok(stream_result) => stream_result.map_err(ExecuteError::from),
123 }
124}
125
126pub async fn execute_command(
146 engine: StandardEngine,
147 statements: Vec<String>,
148 identity: Identity,
149 params: Params,
150 timeout: Duration,
151) -> ExecuteResult<Vec<Frame>> {
152 let combined = statements.join("; ");
153 let stream = engine.command_as(&identity, &combined, params);
154
155 let result = tokio::time::timeout(timeout, stream.try_collect::<Vec<Frame>>()).await;
157
158 match result {
159 Err(_elapsed) => Err(ExecuteError::Timeout),
160 Ok(stream_result) => stream_result.map_err(ExecuteError::from),
161 }
162}
163
164pub async fn execute_query_single(
168 engine: StandardEngine,
169 query: &str,
170 identity: Identity,
171 params: Params,
172 timeout: Duration,
173) -> ExecuteResult<Vec<Frame>> {
174 execute_query(engine, query.to_string(), identity, params, timeout).await
175}
176
177pub async fn execute_command_single(
181 engine: StandardEngine,
182 command: &str,
183 identity: Identity,
184 params: Params,
185 timeout: Duration,
186) -> ExecuteResult<Vec<Frame>> {
187 execute_command(engine, vec![command.to_string()], identity, params, timeout).await
188}
189
190#[cfg(test)]
191mod tests {
192 use super::*;
193
194 #[test]
195 fn test_execute_error_display() {
196 let timeout_err = ExecuteError::Timeout;
197 assert_eq!(timeout_err.to_string(), "Query execution timed out");
198
199 let cancelled_err = ExecuteError::Cancelled;
200 assert_eq!(cancelled_err.to_string(), "Query was cancelled");
201
202 let disconnected_err = ExecuteError::Disconnected;
203 assert_eq!(disconnected_err.to_string(), "Query stream disconnected unexpectedly");
204 }
205}