Skip to main content

reifydb_sub_server/
execute.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4//! Query and command execution with async streaming.
5//!
6//! This module provides async wrappers around the database engine operations.
7//! The engine uses a compute pool for sync execution, streaming results back
8//! through async channels.
9
10use std::{error, fmt, sync::Arc, time::Duration};
11
12use reifydb_engine::engine::StandardEngine;
13use reifydb_runtime::actor::system::ActorSystem;
14use reifydb_type::{
15	error::{Diagnostic, Error},
16	params::Params,
17	value::{frame::frame::Frame, identity::IdentityId},
18};
19use tokio::time;
20use tracing::warn;
21
22/// Error types for query/command execution.
23#[derive(Debug)]
24pub enum ExecuteError {
25	/// Query exceeded the configured timeout.
26	Timeout,
27	/// Query was cancelled.
28	Cancelled,
29	/// Stream disconnected unexpectedly.
30	Disconnected,
31	/// Database engine returned an error with full diagnostic info.
32	Engine {
33		/// The full diagnostic with error code, source location, help text, etc.
34		diagnostic: Arc<Diagnostic>,
35		/// The statement that caused the error.
36		statement: String,
37	},
38}
39
40impl fmt::Display for ExecuteError {
41	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
42		match self {
43			ExecuteError::Timeout => write!(f, "Query execution timed out"),
44			ExecuteError::Cancelled => write!(f, "Query was cancelled"),
45			ExecuteError::Disconnected => write!(f, "Query stream disconnected unexpectedly"),
46			ExecuteError::Engine {
47				diagnostic,
48				..
49			} => write!(f, "Engine error: {}", diagnostic.message),
50		}
51	}
52}
53
54impl error::Error for ExecuteError {}
55
56impl From<Error> for ExecuteError {
57	fn from(err: Error) -> Self {
58		ExecuteError::Engine {
59			diagnostic: Arc::new(err.diagnostic()),
60			statement: String::new(),
61		}
62	}
63}
64
65/// Result type for execute operations.
66pub type ExecuteResult<T> = Result<T, ExecuteError>;
67
68/// Retry a closure up to 3 times on `TXN_001` transaction conflict errors.
69fn retry_on_conflict<F>(mut f: F) -> Result<Vec<Frame>, Error>
70where
71	F: FnMut() -> Result<Vec<Frame>, Error>,
72{
73	let mut last_err = None;
74	for attempt in 0..3u32 {
75		match f() {
76			Ok(frames) => return Ok(frames),
77			Err(err) if err.code == "TXN_001" => {
78				warn!(attempt = attempt + 1, "Transaction conflict detected, retrying");
79				last_err = Some(err);
80			}
81			Err(err) => return Err(err),
82		}
83	}
84	Err(last_err.unwrap())
85}
86
87/// Execute a query with timeout.
88///
89/// This function:
90/// 1. Starts the query execution on the actor system's compute pool
91/// 2. Applies a timeout to the operation
92/// 3. Returns the query results or an appropriate error
93///
94/// # Arguments
95///
96/// * `system` - The actor system to execute the query on
97/// * `engine` - The database engine to execute the query on
98/// * `query` - The RQL query string
99/// * `identity` - The identity context for permission checking
100/// * `params` - Query parameters
101/// * `timeout` - Maximum time to wait for query completion
102///
103/// # Returns
104///
105/// * `Ok(Vec<Frame>)` - Query results on success
106/// * `Err(ExecuteError::Timeout)` - If the query exceeds the timeout
107/// * `Err(ExecuteError::Cancelled)` - If the query was cancelled
108/// * `Err(ExecuteError::Engine)` - If the engine returns an error
109///
110/// # Example
111///
112/// ```ignore
113/// let result = execute_query(
114///     system,
115///     engine,
116///     "FROM users take 42".to_string(),
117///     identity,
118///     Params::None,
119///     Duration::from_secs(30),
120/// ).await?;
121/// ```
122pub async fn execute_query(
123	system: ActorSystem,
124	engine: StandardEngine,
125	query: String,
126	identity: IdentityId,
127	params: Params,
128	timeout: Duration,
129) -> ExecuteResult<Vec<Frame>> {
130	// Execute synchronous query on actor system's compute pool with timeout
131	let task = system.execute(move || engine.query_as(identity, &query, params));
132
133	let result = time::timeout(timeout, task).await;
134
135	match result {
136		Err(_elapsed) => Err(ExecuteError::Timeout),
137		Ok(Ok(frames_result)) => frames_result.map_err(ExecuteError::from),
138		Ok(Err(_join_error)) => Err(ExecuteError::Cancelled),
139	}
140}
141
142/// Execute an admin operation with timeout.
143///
144/// Admin operations include DDL (CREATE TABLE, ALTER, etc.), DML (INSERT, UPDATE, DELETE),
145/// and read queries. This is the most privileged execution level.
146///
147/// # Arguments
148///
149/// * `system` - The actor system to execute the admin operation on
150/// * `engine` - The database engine to execute the admin operation on
151/// * `statements` - The RQL admin statements
152/// * `identity` - The identity context for permission checking
153/// * `params` - Admin parameters
154/// * `timeout` - Maximum time to wait for admin completion
155///
156/// # Returns
157///
158/// * `Ok(Vec<Frame>)` - Admin results on success
159/// * `Err(ExecuteError::Timeout)` - If the admin operation exceeds the timeout
160/// * `Err(ExecuteError::Cancelled)` - If the admin operation was cancelled
161/// * `Err(ExecuteError::Engine)` - If the engine returns an error
162pub async fn execute_admin(
163	system: ActorSystem,
164	engine: StandardEngine,
165	statements: Vec<String>,
166	identity: IdentityId,
167	params: Params,
168	timeout: Duration,
169) -> ExecuteResult<Vec<Frame>> {
170	let combined = statements.join("; ");
171
172	// Execute synchronous admin operation on actor system's compute pool with timeout
173	let task = system.execute(move || retry_on_conflict(|| engine.admin_as(identity, &combined, params.clone())));
174
175	let result = time::timeout(timeout, task).await;
176
177	match result {
178		Err(_elapsed) => Err(ExecuteError::Timeout),
179		Ok(Ok(frames_result)) => frames_result.map_err(ExecuteError::from),
180		Ok(Err(_join_error)) => Err(ExecuteError::Cancelled),
181	}
182}
183
184/// Execute a command with timeout.
185///
186/// Commands are write operations (INSERT, UPDATE, DELETE) that modify
187/// the database state. DDL operations are not allowed in command transactions.
188///
189/// # Arguments
190///
191/// * `system` - The actor system to execute the command on
192/// * `engine` - The database engine to execute the command on
193/// * `statements` - The RQL command statements
194/// * `identity` - The identity context for permission checking
195/// * `params` - Command parameters
196/// * `timeout` - Maximum time to wait for command completion
197///
198/// # Returns
199///
200/// * `Ok(Vec<Frame>)` - Command results on success
201/// * `Err(ExecuteError::Timeout)` - If the command exceeds the timeout
202/// * `Err(ExecuteError::Cancelled)` - If the command was cancelled
203/// * `Err(ExecuteError::Engine)` - If the engine returns an error
204pub async fn execute_command(
205	system: ActorSystem,
206	engine: StandardEngine,
207	statements: Vec<String>,
208	identity: IdentityId,
209	params: Params,
210	timeout: Duration,
211) -> ExecuteResult<Vec<Frame>> {
212	let combined = statements.join("; ");
213
214	// Execute synchronous command on actor system's compute pool with timeout
215	let task = system.execute(move || retry_on_conflict(|| engine.command_as(identity, &combined, params.clone())));
216
217	let result = time::timeout(timeout, task).await;
218
219	match result {
220		Err(_elapsed) => Err(ExecuteError::Timeout),
221		Ok(Ok(frames_result)) => frames_result.map_err(ExecuteError::from),
222		Ok(Err(_join_error)) => Err(ExecuteError::Cancelled),
223	}
224}