reifydb_sub_server/
execute.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later
3
4//! Query and command execution with async streaming.
5//!
6//! This module provides async wrappers around the database engine operations.
7//! The engine internally uses spawn_blocking for sync execution, streaming
8//! results back through async channels.
9
10use std::{sync::Arc, time::Duration};
11
12use futures_util::TryStreamExt;
13use reifydb_core::{
14	Frame,
15	interface::{Engine, Identity},
16	stream::StreamError,
17};
18use reifydb_engine::StandardEngine;
19use reifydb_type::{Params, diagnostic::Diagnostic};
20
21/// Error types for query/command execution.
22#[derive(Debug)]
23pub enum ExecuteError {
24	/// Query exceeded the configured timeout.
25	Timeout,
26	/// Query was cancelled.
27	Cancelled,
28	/// Stream disconnected unexpectedly.
29	Disconnected,
30	/// Database engine returned an error with full diagnostic info.
31	Engine {
32		/// The full diagnostic with error code, source location, help text, etc.
33		diagnostic: Arc<Diagnostic>,
34		/// The statement that caused the error.
35		statement: String,
36	},
37}
38
39impl std::fmt::Display for ExecuteError {
40	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
41		match self {
42			ExecuteError::Timeout => write!(f, "Query execution timed out"),
43			ExecuteError::Cancelled => write!(f, "Query was cancelled"),
44			ExecuteError::Disconnected => write!(f, "Query stream disconnected unexpectedly"),
45			ExecuteError::Engine {
46				diagnostic,
47				..
48			} => write!(f, "Engine error: {}", diagnostic.message),
49		}
50	}
51}
52
53impl std::error::Error for ExecuteError {}
54
55impl From<StreamError> for ExecuteError {
56	fn from(err: StreamError) -> Self {
57		match err {
58			StreamError::Query {
59				diagnostic,
60				statement,
61			} => ExecuteError::Engine {
62				diagnostic, // Preserve full diagnostic
63				statement: statement.unwrap_or_default(),
64			},
65			StreamError::Cancelled => ExecuteError::Cancelled,
66			StreamError::Timeout => ExecuteError::Timeout,
67			StreamError::Disconnected => ExecuteError::Disconnected,
68		}
69	}
70}
71
72/// Result type for execute operations.
73pub type ExecuteResult<T> = std::result::Result<T, ExecuteError>;
74
75/// Execute a query with timeout.
76///
77/// This function:
78/// 1. Starts the async query execution (internally uses spawn_blocking)
79/// 2. Collects the stream with a timeout
80/// 3. Returns the query results or an appropriate error
81///
82/// # Arguments
83///
84/// * `engine` - The database engine to execute the query on
85/// * `query` - The RQL query string
86/// * `identity` - The identity context for permission checking
87/// * `params` - Query parameters
88/// * `timeout` - Maximum time to wait for query completion
89///
90/// # Returns
91///
92/// * `Ok(Vec<Frame>)` - Query results on success
93/// * `Err(ExecuteError::Timeout)` - If the query exceeds the timeout
94/// * `Err(ExecuteError::Cancelled)` - If the query was cancelled
95/// * `Err(ExecuteError::Engine)` - If the engine returns an error
96///
97/// # Example
98///
99/// ```ignore
100/// let result = execute_query(
101///     engine,
102///     "SELECT * FROM users".to_string(),
103///     identity,
104///     Params::None,
105///     Duration::from_secs(30),
106/// ).await?;
107/// ```
108pub async fn execute_query(
109	engine: StandardEngine,
110	query: String,
111	identity: Identity,
112	params: Params,
113	timeout: Duration,
114) -> ExecuteResult<Vec<Frame>> {
115	let stream = engine.query_as(&identity, &query, params);
116
117	// Collect the stream with a timeout
118	let result = tokio::time::timeout(timeout, stream.try_collect::<Vec<Frame>>()).await;
119
120	match result {
121		Err(_elapsed) => Err(ExecuteError::Timeout),
122		Ok(stream_result) => stream_result.map_err(ExecuteError::from),
123	}
124}
125
126/// Execute a command with timeout.
127///
128/// Commands are write operations (INSERT, UPDATE, DELETE, DDL) that modify
129/// the database state.
130///
131/// # Arguments
132///
133/// * `engine` - The database engine to execute the command on
134/// * `statements` - The RQL command statements
135/// * `identity` - The identity context for permission checking
136/// * `params` - Command parameters
137/// * `timeout` - Maximum time to wait for command completion
138///
139/// # Returns
140///
141/// * `Ok(Vec<Frame>)` - Command results on success
142/// * `Err(ExecuteError::Timeout)` - If the command exceeds the timeout
143/// * `Err(ExecuteError::Cancelled)` - If the command was cancelled
144/// * `Err(ExecuteError::Engine)` - If the engine returns an error
145pub async fn execute_command(
146	engine: StandardEngine,
147	statements: Vec<String>,
148	identity: Identity,
149	params: Params,
150	timeout: Duration,
151) -> ExecuteResult<Vec<Frame>> {
152	let combined = statements.join("; ");
153	let stream = engine.command_as(&identity, &combined, params);
154
155	// Collect the stream with a timeout
156	let result = tokio::time::timeout(timeout, stream.try_collect::<Vec<Frame>>()).await;
157
158	match result {
159		Err(_elapsed) => Err(ExecuteError::Timeout),
160		Ok(stream_result) => stream_result.map_err(ExecuteError::from),
161	}
162}
163
164/// Execute a single query statement with timeout.
165///
166/// This is a convenience wrapper around `execute_query` for single statements.
167pub async fn execute_query_single(
168	engine: StandardEngine,
169	query: &str,
170	identity: Identity,
171	params: Params,
172	timeout: Duration,
173) -> ExecuteResult<Vec<Frame>> {
174	execute_query(engine, query.to_string(), identity, params, timeout).await
175}
176
177/// Execute a single command statement with timeout.
178///
179/// This is a convenience wrapper around `execute_command` for single statements.
180pub async fn execute_command_single(
181	engine: StandardEngine,
182	command: &str,
183	identity: Identity,
184	params: Params,
185	timeout: Duration,
186) -> ExecuteResult<Vec<Frame>> {
187	execute_command(engine, vec![command.to_string()], identity, params, timeout).await
188}
189
190#[cfg(test)]
191mod tests {
192	use super::*;
193
194	#[test]
195	fn test_execute_error_display() {
196		let timeout_err = ExecuteError::Timeout;
197		assert_eq!(timeout_err.to_string(), "Query execution timed out");
198
199		let cancelled_err = ExecuteError::Cancelled;
200		assert_eq!(cancelled_err.to_string(), "Query was cancelled");
201
202		let disconnected_err = ExecuteError::Disconnected;
203		assert_eq!(disconnected_err.to_string(), "Query stream disconnected unexpectedly");
204	}
205}