reifydb_sub_server/execute.rs
1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2025 ReifyDB
3
4//! Query and command execution with async streaming.
5//!
6//! This module provides async wrappers around the database engine operations.
7//! The engine uses a compute pool for sync execution, streaming results back
8//! through async channels.
9
10use std::{error, fmt, sync::Arc, time::Duration};
11
12use reifydb_engine::engine::StandardEngine;
13use reifydb_runtime::actor::system::ActorSystem;
14use reifydb_type::{
15 error::{Diagnostic, Error},
16 params::Params,
17 value::{frame::frame::Frame, identity::IdentityId},
18};
19use tokio::time;
20
21/// Error types for query/command execution.
22#[derive(Debug)]
23pub enum ExecuteError {
24 /// Query exceeded the configured timeout.
25 Timeout,
26 /// Query was cancelled.
27 Cancelled,
28 /// Stream disconnected unexpectedly.
29 Disconnected,
30 /// Database engine returned an error with full diagnostic info.
31 Engine {
32 /// The full diagnostic with error code, source location, help text, etc.
33 diagnostic: Arc<Diagnostic>,
34 /// The statement that caused the error.
35 statement: String,
36 },
37}
38
39impl fmt::Display for ExecuteError {
40 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
41 match self {
42 ExecuteError::Timeout => write!(f, "Query execution timed out"),
43 ExecuteError::Cancelled => write!(f, "Query was cancelled"),
44 ExecuteError::Disconnected => write!(f, "Query stream disconnected unexpectedly"),
45 ExecuteError::Engine {
46 diagnostic,
47 ..
48 } => write!(f, "Engine error: {}", diagnostic.message),
49 }
50 }
51}
52
53impl error::Error for ExecuteError {}
54
55impl From<Error> for ExecuteError {
56 fn from(err: Error) -> Self {
57 ExecuteError::Engine {
58 diagnostic: Arc::new(err.diagnostic()),
59 statement: String::new(),
60 }
61 }
62}
63
64/// Result type for execute operations.
65pub type ExecuteResult<T> = Result<T, ExecuteError>;
66
67/// Execute a query with timeout.
68///
69/// This function:
70/// 1. Starts the query execution on the actor system's compute pool
71/// 2. Applies a timeout to the operation
72/// 3. Returns the query results or an appropriate error
73///
74/// # Arguments
75///
76/// * `system` - The actor system to execute the query on
77/// * `engine` - The database engine to execute the query on
78/// * `query` - The RQL query string
79/// * `identity` - The identity context for permission checking
80/// * `params` - Query parameters
81/// * `timeout` - Maximum time to wait for query completion
82///
83/// # Returns
84///
85/// * `Ok(Vec<Frame>)` - Query results on success
86/// * `Err(ExecuteError::Timeout)` - If the query exceeds the timeout
87/// * `Err(ExecuteError::Cancelled)` - If the query was cancelled
88/// * `Err(ExecuteError::Engine)` - If the engine returns an error
89///
90/// # Example
91///
92/// ```ignore
93/// let result = execute_query(
94/// system,
95/// engine,
96/// "FROM users take 42".to_string(),
97/// identity,
98/// Params::None,
99/// Duration::from_secs(30),
100/// ).await?;
101/// ```
102pub async fn execute_query(
103 system: ActorSystem,
104 engine: StandardEngine,
105 query: String,
106 identity: IdentityId,
107 params: Params,
108 timeout: Duration,
109) -> ExecuteResult<Vec<Frame>> {
110 // Execute synchronous query on actor system's compute pool with timeout
111 let task = system.compute(move || engine.query_as(identity, &query, params));
112
113 let result = time::timeout(timeout, task).await;
114
115 match result {
116 Err(_elapsed) => Err(ExecuteError::Timeout),
117 Ok(Ok(frames_result)) => frames_result.map_err(ExecuteError::from),
118 Ok(Err(_join_error)) => Err(ExecuteError::Cancelled),
119 }
120}
121
122/// Execute an admin operation with timeout.
123///
124/// Admin operations include DDL (CREATE TABLE, ALTER, etc.), DML (INSERT, UPDATE, DELETE),
125/// and read queries. This is the most privileged execution level.
126///
127/// # Arguments
128///
129/// * `system` - The actor system to execute the admin operation on
130/// * `engine` - The database engine to execute the admin operation on
131/// * `statements` - The RQL admin statements
132/// * `identity` - The identity context for permission checking
133/// * `params` - Admin parameters
134/// * `timeout` - Maximum time to wait for admin completion
135///
136/// # Returns
137///
138/// * `Ok(Vec<Frame>)` - Admin results on success
139/// * `Err(ExecuteError::Timeout)` - If the admin operation exceeds the timeout
140/// * `Err(ExecuteError::Cancelled)` - If the admin operation was cancelled
141/// * `Err(ExecuteError::Engine)` - If the engine returns an error
142pub async fn execute_admin(
143 system: ActorSystem,
144 engine: StandardEngine,
145 statements: Vec<String>,
146 identity: IdentityId,
147 params: Params,
148 timeout: Duration,
149) -> ExecuteResult<Vec<Frame>> {
150 let combined = statements.join("; ");
151
152 // Execute synchronous admin operation on actor system's compute pool with timeout
153 let task = system.compute(move || engine.admin_as(identity, &combined, params));
154
155 let result = time::timeout(timeout, task).await;
156
157 match result {
158 Err(_elapsed) => Err(ExecuteError::Timeout),
159 Ok(Ok(frames_result)) => frames_result.map_err(ExecuteError::from),
160 Ok(Err(_join_error)) => Err(ExecuteError::Cancelled),
161 }
162}
163
164/// Execute a command with timeout.
165///
166/// Commands are write operations (INSERT, UPDATE, DELETE) that modify
167/// the database state. DDL operations are not allowed in command transactions.
168///
169/// # Arguments
170///
171/// * `system` - The actor system to execute the command on
172/// * `engine` - The database engine to execute the command on
173/// * `statements` - The RQL command statements
174/// * `identity` - The identity context for permission checking
175/// * `params` - Command parameters
176/// * `timeout` - Maximum time to wait for command completion
177///
178/// # Returns
179///
180/// * `Ok(Vec<Frame>)` - Command results on success
181/// * `Err(ExecuteError::Timeout)` - If the command exceeds the timeout
182/// * `Err(ExecuteError::Cancelled)` - If the command was cancelled
183/// * `Err(ExecuteError::Engine)` - If the engine returns an error
184pub async fn execute_command(
185 system: ActorSystem,
186 engine: StandardEngine,
187 statements: Vec<String>,
188 identity: IdentityId,
189 params: Params,
190 timeout: Duration,
191) -> ExecuteResult<Vec<Frame>> {
192 let combined = statements.join("; ");
193
194 // Execute synchronous command on actor system's compute pool with timeout
195 let task = system.compute(move || engine.command_as(identity, &combined, params));
196
197 let result = time::timeout(timeout, task).await;
198
199 match result {
200 Err(_elapsed) => Err(ExecuteError::Timeout),
201 Ok(Ok(frames_result)) => frames_result.map_err(ExecuteError::from),
202 Ok(Err(_join_error)) => Err(ExecuteError::Cancelled),
203 }
204}