reifydb_sub_server/execute.rs
1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4//! Query and command execution with async streaming.
5//!
6//! This module provides async wrappers around the database engine operations.
7//! The engine uses a compute pool for sync execution, streaming results back
8//! through async channels.
9
10use std::{error, fmt, sync::Arc, time::Duration};
11
12use reifydb_engine::engine::StandardEngine;
13use reifydb_runtime::actor::system::ActorSystem;
14use reifydb_type::{
15 error::{Diagnostic, Error},
16 params::Params,
17 value::{frame::frame::Frame, identity::IdentityId},
18};
19use tokio::time;
20use tracing::warn;
21
22/// Error types for query/command execution.
23#[derive(Debug)]
24pub enum ExecuteError {
25 /// Query exceeded the configured timeout.
26 Timeout,
27 /// Query was cancelled.
28 Cancelled,
29 /// Stream disconnected unexpectedly.
30 Disconnected,
31 /// Database engine returned an error with full diagnostic info.
32 Engine {
33 /// The full diagnostic with error code, source location, help text, etc.
34 diagnostic: Arc<Diagnostic>,
35 /// The statement that caused the error.
36 statement: String,
37 },
38}
39
40impl fmt::Display for ExecuteError {
41 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
42 match self {
43 ExecuteError::Timeout => write!(f, "Query execution timed out"),
44 ExecuteError::Cancelled => write!(f, "Query was cancelled"),
45 ExecuteError::Disconnected => write!(f, "Query stream disconnected unexpectedly"),
46 ExecuteError::Engine {
47 diagnostic,
48 ..
49 } => write!(f, "Engine error: {}", diagnostic.message),
50 }
51 }
52}
53
54impl error::Error for ExecuteError {}
55
56impl From<Error> for ExecuteError {
57 fn from(err: Error) -> Self {
58 ExecuteError::Engine {
59 diagnostic: Arc::new(err.diagnostic()),
60 statement: String::new(),
61 }
62 }
63}
64
65/// Result type for execute operations.
66pub type ExecuteResult<T> = Result<T, ExecuteError>;
67
68/// Retry a closure up to 3 times on `TXN_001` transaction conflict errors.
69fn retry_on_conflict<F>(mut f: F) -> Result<Vec<Frame>, Error>
70where
71 F: FnMut() -> Result<Vec<Frame>, Error>,
72{
73 let mut last_err = None;
74 for attempt in 0..3u32 {
75 match f() {
76 Ok(frames) => return Ok(frames),
77 Err(err) if err.code == "TXN_001" => {
78 warn!(attempt = attempt + 1, "Transaction conflict detected, retrying");
79 last_err = Some(err);
80 }
81 Err(err) => return Err(err),
82 }
83 }
84 Err(last_err.unwrap())
85}
86
87/// Execute a query with timeout.
88///
89/// This function:
90/// 1. Starts the query execution on the actor system's compute pool
91/// 2. Applies a timeout to the operation
92/// 3. Returns the query results or an appropriate error
93///
94/// # Arguments
95///
96/// * `system` - The actor system to execute the query on
97/// * `engine` - The database engine to execute the query on
98/// * `query` - The RQL query string
99/// * `identity` - The identity context for permission checking
100/// * `params` - Query parameters
101/// * `timeout` - Maximum time to wait for query completion
102///
103/// # Returns
104///
105/// * `Ok(Vec<Frame>)` - Query results on success
106/// * `Err(ExecuteError::Timeout)` - If the query exceeds the timeout
107/// * `Err(ExecuteError::Cancelled)` - If the query was cancelled
108/// * `Err(ExecuteError::Engine)` - If the engine returns an error
109///
110/// # Example
111///
112/// ```ignore
113/// let result = execute_query(
114/// system,
115/// engine,
116/// "FROM users take 42".to_string(),
117/// identity,
118/// Params::None,
119/// Duration::from_secs(30),
120/// ).await?;
121/// ```
122pub async fn execute_query(
123 system: ActorSystem,
124 engine: StandardEngine,
125 query: String,
126 identity: IdentityId,
127 params: Params,
128 timeout: Duration,
129) -> ExecuteResult<Vec<Frame>> {
130 // Execute synchronous query on actor system's compute pool with timeout
131 let task = system.execute(move || engine.query_as(identity, &query, params));
132
133 let result = time::timeout(timeout, task).await;
134
135 match result {
136 Err(_elapsed) => Err(ExecuteError::Timeout),
137 Ok(Ok(frames_result)) => frames_result.map_err(ExecuteError::from),
138 Ok(Err(_join_error)) => Err(ExecuteError::Cancelled),
139 }
140}
141
142/// Execute an admin operation with timeout.
143///
144/// Admin operations include DDL (CREATE TABLE, ALTER, etc.), DML (INSERT, UPDATE, DELETE),
145/// and read queries. This is the most privileged execution level.
146///
147/// # Arguments
148///
149/// * `system` - The actor system to execute the admin operation on
150/// * `engine` - The database engine to execute the admin operation on
151/// * `statements` - The RQL admin statements
152/// * `identity` - The identity context for permission checking
153/// * `params` - Admin parameters
154/// * `timeout` - Maximum time to wait for admin completion
155///
156/// # Returns
157///
158/// * `Ok(Vec<Frame>)` - Admin results on success
159/// * `Err(ExecuteError::Timeout)` - If the admin operation exceeds the timeout
160/// * `Err(ExecuteError::Cancelled)` - If the admin operation was cancelled
161/// * `Err(ExecuteError::Engine)` - If the engine returns an error
162pub async fn execute_admin(
163 system: ActorSystem,
164 engine: StandardEngine,
165 statements: Vec<String>,
166 identity: IdentityId,
167 params: Params,
168 timeout: Duration,
169) -> ExecuteResult<Vec<Frame>> {
170 let combined = statements.join("; ");
171
172 // Execute synchronous admin operation on actor system's compute pool with timeout
173 let task = system.execute(move || retry_on_conflict(|| engine.admin_as(identity, &combined, params.clone())));
174
175 let result = time::timeout(timeout, task).await;
176
177 match result {
178 Err(_elapsed) => Err(ExecuteError::Timeout),
179 Ok(Ok(frames_result)) => frames_result.map_err(ExecuteError::from),
180 Ok(Err(_join_error)) => Err(ExecuteError::Cancelled),
181 }
182}
183
184/// Execute a command with timeout.
185///
186/// Commands are write operations (INSERT, UPDATE, DELETE) that modify
187/// the database state. DDL operations are not allowed in command transactions.
188///
189/// # Arguments
190///
191/// * `system` - The actor system to execute the command on
192/// * `engine` - The database engine to execute the command on
193/// * `statements` - The RQL command statements
194/// * `identity` - The identity context for permission checking
195/// * `params` - Command parameters
196/// * `timeout` - Maximum time to wait for command completion
197///
198/// # Returns
199///
200/// * `Ok(Vec<Frame>)` - Command results on success
201/// * `Err(ExecuteError::Timeout)` - If the command exceeds the timeout
202/// * `Err(ExecuteError::Cancelled)` - If the command was cancelled
203/// * `Err(ExecuteError::Engine)` - If the engine returns an error
204pub async fn execute_command(
205 system: ActorSystem,
206 engine: StandardEngine,
207 statements: Vec<String>,
208 identity: IdentityId,
209 params: Params,
210 timeout: Duration,
211) -> ExecuteResult<Vec<Frame>> {
212 let combined = statements.join("; ");
213
214 // Execute synchronous command on actor system's compute pool with timeout
215 let task = system.execute(move || retry_on_conflict(|| engine.command_as(identity, &combined, params.clone())));
216
217 let result = time::timeout(timeout, task).await;
218
219 match result {
220 Err(_elapsed) => Err(ExecuteError::Timeout),
221 Ok(Ok(frames_result)) => frames_result.map_err(ExecuteError::from),
222 Ok(Err(_join_error)) => Err(ExecuteError::Cancelled),
223 }
224}