Skip to main content

reifydb_sub_server/
execute.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2025 ReifyDB
3
4//! Query and command execution with async streaming.
5//!
6//! This module provides async wrappers around the database engine operations.
7//! The engine uses a compute pool for sync execution, streaming results back
8//! through async channels.
9
10use std::{error, fmt, sync::Arc, time::Duration};
11
12use reifydb_engine::engine::StandardEngine;
13use reifydb_runtime::actor::system::ActorSystem;
14use reifydb_type::{
15	error::{Diagnostic, Error},
16	params::Params,
17	value::{frame::frame::Frame, identity::IdentityId},
18};
19use tokio::time;
20
21/// Error types for query/command execution.
22#[derive(Debug)]
23pub enum ExecuteError {
24	/// Query exceeded the configured timeout.
25	Timeout,
26	/// Query was cancelled.
27	Cancelled,
28	/// Stream disconnected unexpectedly.
29	Disconnected,
30	/// Database engine returned an error with full diagnostic info.
31	Engine {
32		/// The full diagnostic with error code, source location, help text, etc.
33		diagnostic: Arc<Diagnostic>,
34		/// The statement that caused the error.
35		statement: String,
36	},
37}
38
39impl fmt::Display for ExecuteError {
40	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
41		match self {
42			ExecuteError::Timeout => write!(f, "Query execution timed out"),
43			ExecuteError::Cancelled => write!(f, "Query was cancelled"),
44			ExecuteError::Disconnected => write!(f, "Query stream disconnected unexpectedly"),
45			ExecuteError::Engine {
46				diagnostic,
47				..
48			} => write!(f, "Engine error: {}", diagnostic.message),
49		}
50	}
51}
52
53impl error::Error for ExecuteError {}
54
55impl From<Error> for ExecuteError {
56	fn from(err: Error) -> Self {
57		ExecuteError::Engine {
58			diagnostic: Arc::new(err.diagnostic()),
59			statement: String::new(),
60		}
61	}
62}
63
64/// Result type for execute operations.
65pub type ExecuteResult<T> = Result<T, ExecuteError>;
66
67/// Execute a query with timeout.
68///
69/// This function:
70/// 1. Starts the query execution on the actor system's compute pool
71/// 2. Applies a timeout to the operation
72/// 3. Returns the query results or an appropriate error
73///
74/// # Arguments
75///
76/// * `system` - The actor system to execute the query on
77/// * `engine` - The database engine to execute the query on
78/// * `query` - The RQL query string
79/// * `identity` - The identity context for permission checking
80/// * `params` - Query parameters
81/// * `timeout` - Maximum time to wait for query completion
82///
83/// # Returns
84///
85/// * `Ok(Vec<Frame>)` - Query results on success
86/// * `Err(ExecuteError::Timeout)` - If the query exceeds the timeout
87/// * `Err(ExecuteError::Cancelled)` - If the query was cancelled
88/// * `Err(ExecuteError::Engine)` - If the engine returns an error
89///
90/// # Example
91///
92/// ```ignore
93/// let result = execute_query(
94///     system,
95///     engine,
96///     "FROM users take 42".to_string(),
97///     identity,
98///     Params::None,
99///     Duration::from_secs(30),
100/// ).await?;
101/// ```
102pub async fn execute_query(
103	system: ActorSystem,
104	engine: StandardEngine,
105	query: String,
106	identity: IdentityId,
107	params: Params,
108	timeout: Duration,
109) -> ExecuteResult<Vec<Frame>> {
110	// Execute synchronous query on actor system's compute pool with timeout
111	let task = system.compute(move || engine.query_as(identity, &query, params));
112
113	let result = time::timeout(timeout, task).await;
114
115	match result {
116		Err(_elapsed) => Err(ExecuteError::Timeout),
117		Ok(Ok(frames_result)) => frames_result.map_err(ExecuteError::from),
118		Ok(Err(_join_error)) => Err(ExecuteError::Cancelled),
119	}
120}
121
122/// Execute an admin operation with timeout.
123///
124/// Admin operations include DDL (CREATE TABLE, ALTER, etc.), DML (INSERT, UPDATE, DELETE),
125/// and read queries. This is the most privileged execution level.
126///
127/// # Arguments
128///
129/// * `system` - The actor system to execute the admin operation on
130/// * `engine` - The database engine to execute the admin operation on
131/// * `statements` - The RQL admin statements
132/// * `identity` - The identity context for permission checking
133/// * `params` - Admin parameters
134/// * `timeout` - Maximum time to wait for admin completion
135///
136/// # Returns
137///
138/// * `Ok(Vec<Frame>)` - Admin results on success
139/// * `Err(ExecuteError::Timeout)` - If the admin operation exceeds the timeout
140/// * `Err(ExecuteError::Cancelled)` - If the admin operation was cancelled
141/// * `Err(ExecuteError::Engine)` - If the engine returns an error
142pub async fn execute_admin(
143	system: ActorSystem,
144	engine: StandardEngine,
145	statements: Vec<String>,
146	identity: IdentityId,
147	params: Params,
148	timeout: Duration,
149) -> ExecuteResult<Vec<Frame>> {
150	let combined = statements.join("; ");
151
152	// Execute synchronous admin operation on actor system's compute pool with timeout
153	let task = system.compute(move || engine.admin_as(identity, &combined, params));
154
155	let result = time::timeout(timeout, task).await;
156
157	match result {
158		Err(_elapsed) => Err(ExecuteError::Timeout),
159		Ok(Ok(frames_result)) => frames_result.map_err(ExecuteError::from),
160		Ok(Err(_join_error)) => Err(ExecuteError::Cancelled),
161	}
162}
163
164/// Execute a command with timeout.
165///
166/// Commands are write operations (INSERT, UPDATE, DELETE) that modify
167/// the database state. DDL operations are not allowed in command transactions.
168///
169/// # Arguments
170///
171/// * `system` - The actor system to execute the command on
172/// * `engine` - The database engine to execute the command on
173/// * `statements` - The RQL command statements
174/// * `identity` - The identity context for permission checking
175/// * `params` - Command parameters
176/// * `timeout` - Maximum time to wait for command completion
177///
178/// # Returns
179///
180/// * `Ok(Vec<Frame>)` - Command results on success
181/// * `Err(ExecuteError::Timeout)` - If the command exceeds the timeout
182/// * `Err(ExecuteError::Cancelled)` - If the command was cancelled
183/// * `Err(ExecuteError::Engine)` - If the engine returns an error
184pub async fn execute_command(
185	system: ActorSystem,
186	engine: StandardEngine,
187	statements: Vec<String>,
188	identity: IdentityId,
189	params: Params,
190	timeout: Duration,
191) -> ExecuteResult<Vec<Frame>> {
192	let combined = statements.join("; ");
193
194	// Execute synchronous command on actor system's compute pool with timeout
195	let task = system.compute(move || engine.command_as(identity, &combined, params));
196
197	let result = time::timeout(timeout, task).await;
198
199	match result {
200		Err(_elapsed) => Err(ExecuteError::Timeout),
201		Ok(Ok(frames_result)) => frames_result.map_err(ExecuteError::from),
202		Ok(Err(_join_error)) => Err(ExecuteError::Cancelled),
203	}
204}