reifydb_sub_server/execute.rs
1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2025 ReifyDB
3
4//! Query and command execution with async streaming.
5//!
6//! This module provides async wrappers around the database engine operations.
7//! The engine uses a compute pool for sync execution, streaming results back
8//! through async channels.
9
10use std::{sync::Arc, time::Duration};
11
12use reifydb_core::interface::auth::Identity;
13use reifydb_engine::engine::StandardEngine;
14use reifydb_runtime::actor::system::ActorSystem;
15use reifydb_type::{
16 error::{Error, diagnostic::Diagnostic},
17 params::Params,
18 value::frame::frame::Frame,
19};
20use tokio::time;
21
22/// Error types for query/command execution.
23#[derive(Debug)]
24pub enum ExecuteError {
25 /// Query exceeded the configured timeout.
26 Timeout,
27 /// Query was cancelled.
28 Cancelled,
29 /// Stream disconnected unexpectedly.
30 Disconnected,
31 /// Database engine returned an error with full diagnostic info.
32 Engine {
33 /// The full diagnostic with error code, source location, help text, etc.
34 diagnostic: Arc<Diagnostic>,
35 /// The statement that caused the error.
36 statement: String,
37 },
38}
39
40impl std::fmt::Display for ExecuteError {
41 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42 match self {
43 ExecuteError::Timeout => write!(f, "Query execution timed out"),
44 ExecuteError::Cancelled => write!(f, "Query was cancelled"),
45 ExecuteError::Disconnected => write!(f, "Query stream disconnected unexpectedly"),
46 ExecuteError::Engine {
47 diagnostic,
48 ..
49 } => write!(f, "Engine error: {}", diagnostic.message),
50 }
51 }
52}
53
54impl std::error::Error for ExecuteError {}
55
56impl From<Error> for ExecuteError {
57 fn from(err: Error) -> Self {
58 ExecuteError::Engine {
59 diagnostic: Arc::new(err.diagnostic()),
60 statement: String::new(),
61 }
62 }
63}
64
65/// Result type for execute operations.
66pub type ExecuteResult<T> = std::result::Result<T, ExecuteError>;
67
68/// Execute a query with timeout.
69///
70/// This function:
71/// 1. Starts the query execution on the actor system's compute pool
72/// 2. Applies a timeout to the operation
73/// 3. Returns the query results or an appropriate error
74///
75/// # Arguments
76///
77/// * `system` - The actor system to execute the query on
78/// * `engine` - The database engine to execute the query on
79/// * `query` - The RQL query string
80/// * `identity` - The identity context for permission checking
81/// * `params` - Query parameters
82/// * `timeout` - Maximum time to wait for query completion
83///
84/// # Returns
85///
86/// * `Ok(Vec<Frame>)` - Query results on success
87/// * `Err(ExecuteError::Timeout)` - If the query exceeds the timeout
88/// * `Err(ExecuteError::Cancelled)` - If the query was cancelled
89/// * `Err(ExecuteError::Engine)` - If the engine returns an error
90///
91/// # Example
92///
93/// ```ignore
94/// let result = execute_query(
95/// system,
96/// engine,
97/// "FROM users take 42".to_string(),
98/// identity,
99/// Params::None,
100/// Duration::from_secs(30),
101/// ).await?;
102/// ```
103pub async fn execute_query(
104 system: ActorSystem,
105 engine: StandardEngine,
106 query: String,
107 identity: Identity,
108 params: Params,
109 timeout: Duration,
110) -> ExecuteResult<Vec<Frame>> {
111 // Execute synchronous query on actor system's compute pool with timeout
112 let task = system.compute(move || engine.query_as(&identity, &query, params));
113
114 let result = time::timeout(timeout, task).await;
115
116 match result {
117 Err(_elapsed) => Err(ExecuteError::Timeout),
118 Ok(Ok(frames_result)) => frames_result.map_err(ExecuteError::from),
119 Ok(Err(_join_error)) => Err(ExecuteError::Cancelled),
120 }
121}
122
123/// Execute an admin operation with timeout.
124///
125/// Admin operations include DDL (CREATE TABLE, ALTER, etc.), DML (INSERT, UPDATE, DELETE),
126/// and read queries. This is the most privileged execution level.
127///
128/// # Arguments
129///
130/// * `system` - The actor system to execute the admin operation on
131/// * `engine` - The database engine to execute the admin operation on
132/// * `statements` - The RQL admin statements
133/// * `identity` - The identity context for permission checking
134/// * `params` - Admin parameters
135/// * `timeout` - Maximum time to wait for admin completion
136///
137/// # Returns
138///
139/// * `Ok(Vec<Frame>)` - Admin results on success
140/// * `Err(ExecuteError::Timeout)` - If the admin operation exceeds the timeout
141/// * `Err(ExecuteError::Cancelled)` - If the admin operation was cancelled
142/// * `Err(ExecuteError::Engine)` - If the engine returns an error
143pub async fn execute_admin(
144 system: ActorSystem,
145 engine: StandardEngine,
146 statements: Vec<String>,
147 identity: Identity,
148 params: Params,
149 timeout: Duration,
150) -> ExecuteResult<Vec<Frame>> {
151 let combined = statements.join("; ");
152
153 // Execute synchronous admin operation on actor system's compute pool with timeout
154 let task = system.compute(move || engine.admin_as(&identity, &combined, params));
155
156 let result = time::timeout(timeout, task).await;
157
158 match result {
159 Err(_elapsed) => Err(ExecuteError::Timeout),
160 Ok(Ok(frames_result)) => frames_result.map_err(ExecuteError::from),
161 Ok(Err(_join_error)) => Err(ExecuteError::Cancelled),
162 }
163}
164
165/// Execute a command with timeout.
166///
167/// Commands are write operations (INSERT, UPDATE, DELETE) that modify
168/// the database state. DDL operations are not allowed in command transactions.
169///
170/// # Arguments
171///
172/// * `system` - The actor system to execute the command on
173/// * `engine` - The database engine to execute the command on
174/// * `statements` - The RQL command statements
175/// * `identity` - The identity context for permission checking
176/// * `params` - Command parameters
177/// * `timeout` - Maximum time to wait for command completion
178///
179/// # Returns
180///
181/// * `Ok(Vec<Frame>)` - Command results on success
182/// * `Err(ExecuteError::Timeout)` - If the command exceeds the timeout
183/// * `Err(ExecuteError::Cancelled)` - If the command was cancelled
184/// * `Err(ExecuteError::Engine)` - If the engine returns an error
185pub async fn execute_command(
186 system: ActorSystem,
187 engine: StandardEngine,
188 statements: Vec<String>,
189 identity: Identity,
190 params: Params,
191 timeout: Duration,
192) -> ExecuteResult<Vec<Frame>> {
193 let combined = statements.join("; ");
194
195 // Execute synchronous command on actor system's compute pool with timeout
196 let task = system.compute(move || engine.command_as(&identity, &combined, params));
197
198 let result = time::timeout(timeout, task).await;
199
200 match result {
201 Err(_elapsed) => Err(ExecuteError::Timeout),
202 Ok(Ok(frames_result)) => frames_result.map_err(ExecuteError::from),
203 Ok(Err(_join_error)) => Err(ExecuteError::Cancelled),
204 }
205}