Skip to main content

reifydb_sub_server/
execute.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2025 ReifyDB
3
4//! Query and command execution with async streaming.
5//!
6//! This module provides async wrappers around the database engine operations.
7//! The engine uses a compute pool for sync execution, streaming results back
8//! through async channels.
9
10use std::{sync::Arc, time::Duration};
11
12use reifydb_core::interface::auth::Identity;
13use reifydb_engine::engine::StandardEngine;
14use reifydb_runtime::actor::system::ActorSystem;
15use reifydb_type::{
16	error::{Error, diagnostic::Diagnostic},
17	params::Params,
18	value::frame::frame::Frame,
19};
20use tokio::time;
21
22/// Error types for query/command execution.
23#[derive(Debug)]
24pub enum ExecuteError {
25	/// Query exceeded the configured timeout.
26	Timeout,
27	/// Query was cancelled.
28	Cancelled,
29	/// Stream disconnected unexpectedly.
30	Disconnected,
31	/// Database engine returned an error with full diagnostic info.
32	Engine {
33		/// The full diagnostic with error code, source location, help text, etc.
34		diagnostic: Arc<Diagnostic>,
35		/// The statement that caused the error.
36		statement: String,
37	},
38}
39
40impl std::fmt::Display for ExecuteError {
41	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42		match self {
43			ExecuteError::Timeout => write!(f, "Query execution timed out"),
44			ExecuteError::Cancelled => write!(f, "Query was cancelled"),
45			ExecuteError::Disconnected => write!(f, "Query stream disconnected unexpectedly"),
46			ExecuteError::Engine {
47				diagnostic,
48				..
49			} => write!(f, "Engine error: {}", diagnostic.message),
50		}
51	}
52}
53
54impl std::error::Error for ExecuteError {}
55
56impl From<Error> for ExecuteError {
57	fn from(err: Error) -> Self {
58		ExecuteError::Engine {
59			diagnostic: Arc::new(err.diagnostic()),
60			statement: String::new(),
61		}
62	}
63}
64
65/// Result type for execute operations.
66pub type ExecuteResult<T> = std::result::Result<T, ExecuteError>;
67
68/// Execute a query with timeout.
69///
70/// This function:
71/// 1. Starts the query execution on the actor system's compute pool
72/// 2. Applies a timeout to the operation
73/// 3. Returns the query results or an appropriate error
74///
75/// # Arguments
76///
77/// * `system` - The actor system to execute the query on
78/// * `engine` - The database engine to execute the query on
79/// * `query` - The RQL query string
80/// * `identity` - The identity context for permission checking
81/// * `params` - Query parameters
82/// * `timeout` - Maximum time to wait for query completion
83///
84/// # Returns
85///
86/// * `Ok(Vec<Frame>)` - Query results on success
87/// * `Err(ExecuteError::Timeout)` - If the query exceeds the timeout
88/// * `Err(ExecuteError::Cancelled)` - If the query was cancelled
89/// * `Err(ExecuteError::Engine)` - If the engine returns an error
90///
91/// # Example
92///
93/// ```ignore
94/// let result = execute_query(
95///     system,
96///     engine,
97///     "FROM users take 42".to_string(),
98///     identity,
99///     Params::None,
100///     Duration::from_secs(30),
101/// ).await?;
102/// ```
103pub async fn execute_query(
104	system: ActorSystem,
105	engine: StandardEngine,
106	query: String,
107	identity: Identity,
108	params: Params,
109	timeout: Duration,
110) -> ExecuteResult<Vec<Frame>> {
111	// Execute synchronous query on actor system's compute pool with timeout
112	let task = system.compute(move || engine.query_as(&identity, &query, params));
113
114	let result = time::timeout(timeout, task).await;
115
116	match result {
117		Err(_elapsed) => Err(ExecuteError::Timeout),
118		Ok(Ok(frames_result)) => frames_result.map_err(ExecuteError::from),
119		Ok(Err(_join_error)) => Err(ExecuteError::Cancelled),
120	}
121}
122
123/// Execute an admin operation with timeout.
124///
125/// Admin operations include DDL (CREATE TABLE, ALTER, etc.), DML (INSERT, UPDATE, DELETE),
126/// and read queries. This is the most privileged execution level.
127///
128/// # Arguments
129///
130/// * `system` - The actor system to execute the admin operation on
131/// * `engine` - The database engine to execute the admin operation on
132/// * `statements` - The RQL admin statements
133/// * `identity` - The identity context for permission checking
134/// * `params` - Admin parameters
135/// * `timeout` - Maximum time to wait for admin completion
136///
137/// # Returns
138///
139/// * `Ok(Vec<Frame>)` - Admin results on success
140/// * `Err(ExecuteError::Timeout)` - If the admin operation exceeds the timeout
141/// * `Err(ExecuteError::Cancelled)` - If the admin operation was cancelled
142/// * `Err(ExecuteError::Engine)` - If the engine returns an error
143pub async fn execute_admin(
144	system: ActorSystem,
145	engine: StandardEngine,
146	statements: Vec<String>,
147	identity: Identity,
148	params: Params,
149	timeout: Duration,
150) -> ExecuteResult<Vec<Frame>> {
151	let combined = statements.join("; ");
152
153	// Execute synchronous admin operation on actor system's compute pool with timeout
154	let task = system.compute(move || engine.admin_as(&identity, &combined, params));
155
156	let result = time::timeout(timeout, task).await;
157
158	match result {
159		Err(_elapsed) => Err(ExecuteError::Timeout),
160		Ok(Ok(frames_result)) => frames_result.map_err(ExecuteError::from),
161		Ok(Err(_join_error)) => Err(ExecuteError::Cancelled),
162	}
163}
164
165/// Execute a command with timeout.
166///
167/// Commands are write operations (INSERT, UPDATE, DELETE) that modify
168/// the database state. DDL operations are not allowed in command transactions.
169///
170/// # Arguments
171///
172/// * `system` - The actor system to execute the command on
173/// * `engine` - The database engine to execute the command on
174/// * `statements` - The RQL command statements
175/// * `identity` - The identity context for permission checking
176/// * `params` - Command parameters
177/// * `timeout` - Maximum time to wait for command completion
178///
179/// # Returns
180///
181/// * `Ok(Vec<Frame>)` - Command results on success
182/// * `Err(ExecuteError::Timeout)` - If the command exceeds the timeout
183/// * `Err(ExecuteError::Cancelled)` - If the command was cancelled
184/// * `Err(ExecuteError::Engine)` - If the engine returns an error
185pub async fn execute_command(
186	system: ActorSystem,
187	engine: StandardEngine,
188	statements: Vec<String>,
189	identity: Identity,
190	params: Params,
191	timeout: Duration,
192) -> ExecuteResult<Vec<Frame>> {
193	let combined = statements.join("; ");
194
195	// Execute synchronous command on actor system's compute pool with timeout
196	let task = system.compute(move || engine.command_as(&identity, &combined, params));
197
198	let result = time::timeout(timeout, task).await;
199
200	match result {
201		Err(_elapsed) => Err(ExecuteError::Timeout),
202		Ok(Ok(frames_result)) => frames_result.map_err(ExecuteError::from),
203		Ok(Err(_join_error)) => Err(ExecuteError::Cancelled),
204	}
205}