Skip to main content

reifydb_sub_server/
execute.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4//! Query and command execution with interceptor support.
5//!
6//! All execution goes through [`execute`], which runs pre/post interceptor
7//! hooks around the actual engine dispatch. When no interceptors are
8//! registered the overhead is a single `is_empty()` check.
9
10use std::{error, fmt, sync::Arc, time::Duration};
11
12use reifydb_engine::engine::StandardEngine;
13use reifydb_runtime::context::clock::Clock;
14use reifydb_type::{
15	error::{Diagnostic, Error},
16	params::Params,
17	value::{frame::frame::Frame, identity::IdentityId},
18};
19use tokio::{task::spawn_blocking, time};
20use tracing::warn;
21
22use crate::interceptor::{Operation, RequestContext, RequestInterceptorChain, ResponseContext};
23
24/// Error types for query/command execution.
25#[derive(Debug)]
26pub enum ExecuteError {
27	/// Query exceeded the configured timeout.
28	Timeout,
29	/// Query was cancelled.
30	Cancelled,
31	/// Stream disconnected unexpectedly.
32	Disconnected,
33	/// Database engine returned an error with full diagnostic info.
34	Engine {
35		/// The full diagnostic with error code, source location, help text, etc.
36		diagnostic: Arc<Diagnostic>,
37		/// The statement that caused the error.
38		statement: String,
39	},
40	/// Request was rejected by a request interceptor.
41	Rejected {
42		/// Error code for the rejection (e.g. "AUTH_REQUIRED", "INSUFFICIENT_CREDITS").
43		code: String,
44		/// Human-readable reason for rejection.
45		message: String,
46	},
47}
48
49impl fmt::Display for ExecuteError {
50	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
51		match self {
52			ExecuteError::Timeout => write!(f, "Query execution timed out"),
53			ExecuteError::Cancelled => write!(f, "Query was cancelled"),
54			ExecuteError::Disconnected => write!(f, "Query stream disconnected unexpectedly"),
55			ExecuteError::Engine {
56				diagnostic,
57				..
58			} => write!(f, "Engine error: {}", diagnostic.message),
59			ExecuteError::Rejected {
60				code,
61				message,
62			} => write!(f, "Rejected [{}]: {}", code, message),
63		}
64	}
65}
66
67impl error::Error for ExecuteError {}
68
69impl From<Error> for ExecuteError {
70	fn from(err: Error) -> Self {
71		ExecuteError::Engine {
72			diagnostic: Arc::new(err.diagnostic()),
73			statement: String::new(),
74		}
75	}
76}
77
78/// Result type for execute operations.
79pub type ExecuteResult<T> = Result<T, ExecuteError>;
80
81/// Retry a closure up to 3 times on `TXN_001` transaction conflict errors.
82fn retry_on_conflict<F>(mut f: F) -> Result<Vec<Frame>, Error>
83where
84	F: FnMut() -> Result<Vec<Frame>, Error>,
85{
86	let mut last_err = None;
87	for attempt in 0..3u32 {
88		match f() {
89			Ok(frames) => return Ok(frames),
90			Err(err) if err.code == "TXN_001" => {
91				warn!(attempt = attempt + 1, "Transaction conflict detected, retrying");
92				last_err = Some(err);
93			}
94			Err(err) => return Err(err),
95		}
96	}
97	Err(last_err.unwrap())
98}
99
100async fn raw_query(
101	engine: StandardEngine,
102	query: String,
103	identity: IdentityId,
104	params: Params,
105	timeout: Duration,
106	clock: Clock,
107) -> ExecuteResult<(Vec<Frame>, Duration)> {
108	let task = spawn_blocking(move || -> (Result<Vec<Frame>, Error>, Duration) {
109		let t = clock.instant();
110		let r = engine.query_as(identity, &query, params);
111		(r, t.elapsed())
112	});
113	match time::timeout(timeout, task).await {
114		Err(_elapsed) => Err(ExecuteError::Timeout),
115		Ok(Ok((result, compute))) => result.map(|f| (f, compute)).map_err(ExecuteError::from),
116		Ok(Err(_join_error)) => Err(ExecuteError::Cancelled),
117	}
118}
119
120async fn raw_command(
121	engine: StandardEngine,
122	statements: String,
123	identity: IdentityId,
124	params: Params,
125	timeout: Duration,
126	clock: Clock,
127) -> ExecuteResult<(Vec<Frame>, Duration)> {
128	let task = spawn_blocking(move || -> (Result<Vec<Frame>, Error>, Duration) {
129		let t = clock.instant();
130		let r = retry_on_conflict(|| engine.command_as(identity, &statements, params.clone()));
131		(r, t.elapsed())
132	});
133	match time::timeout(timeout, task).await {
134		Err(_elapsed) => Err(ExecuteError::Timeout),
135		Ok(Ok((result, compute))) => result.map(|f| (f, compute)).map_err(ExecuteError::from),
136		Ok(Err(_join_error)) => Err(ExecuteError::Cancelled),
137	}
138}
139
140async fn raw_admin(
141	engine: StandardEngine,
142	statements: String,
143	identity: IdentityId,
144	params: Params,
145	timeout: Duration,
146	clock: Clock,
147) -> ExecuteResult<(Vec<Frame>, Duration)> {
148	let task = spawn_blocking(move || -> (Result<Vec<Frame>, Error>, Duration) {
149		let t = clock.instant();
150		let r = retry_on_conflict(|| engine.admin_as(identity, &statements, params.clone()));
151		(r, t.elapsed())
152	});
153	match time::timeout(timeout, task).await {
154		Err(_elapsed) => Err(ExecuteError::Timeout),
155		Ok(Ok((result, compute))) => result.map(|f| (f, compute)).map_err(ExecuteError::from),
156		Ok(Err(_join_error)) => Err(ExecuteError::Cancelled),
157	}
158}
159
160async fn raw_subscription(
161	engine: StandardEngine,
162	statement: String,
163	identity: IdentityId,
164	params: Params,
165	timeout: Duration,
166	clock: Clock,
167) -> ExecuteResult<(Vec<Frame>, Duration)> {
168	let task = spawn_blocking(move || -> (Result<Vec<Frame>, Error>, Duration) {
169		let t = clock.instant();
170		let r = retry_on_conflict(|| engine.subscribe_as(identity, &statement, params.clone()));
171		(r, t.elapsed())
172	});
173	match time::timeout(timeout, task).await {
174		Err(_elapsed) => Err(ExecuteError::Timeout),
175		Ok(Ok((result, compute))) => result.map(|f| (f, compute)).map_err(ExecuteError::from),
176		Ok(Err(_join_error)) => Err(ExecuteError::Cancelled),
177	}
178}
179
180/// Execute a database operation with interceptor support.
181///
182/// This is the single entry point for all protocol handlers.
183/// Interceptors run before and after the engine dispatch:
184///
185/// 1. `pre_execute` — may reject the request or mutate identity/metadata
186/// 2. Engine dispatch (query / command / admin / subscribe)
187/// 3. `post_execute` — observes result and duration (fire-and-forget)
188///
189/// When the interceptor chain is empty, steps 1 and 3 are skipped.
190pub async fn execute(
191	chain: &RequestInterceptorChain,
192	engine: StandardEngine,
193	mut ctx: RequestContext,
194	timeout: Duration,
195	clock: &Clock,
196) -> ExecuteResult<(Vec<Frame>, Duration)> {
197	// Pre-execute interceptors (may reject, may mutate identity)
198	if !chain.is_empty() {
199		chain.pre_execute(&mut ctx).await?;
200	}
201
202	let start = clock.instant();
203
204	let operation = ctx.operation;
205	let combined = ctx.statements.join("; ");
206
207	// Clone params for response context only when interceptors need it
208	let response_parts = if !chain.is_empty() {
209		Some((ctx.identity, ctx.statements, ctx.params.clone(), ctx.metadata))
210	} else {
211		None
212	};
213
214	let result = match operation {
215		Operation::Query => raw_query(engine, combined, ctx.identity, ctx.params, timeout, clock.clone()).await,
216		Operation::Command => {
217			raw_command(engine, combined, ctx.identity, ctx.params, timeout, clock.clone()).await
218		}
219		Operation::Admin => raw_admin(engine, combined, ctx.identity, ctx.params, timeout, clock.clone()).await,
220		Operation::Subscribe => {
221			raw_subscription(engine, combined, ctx.identity, ctx.params, timeout, clock.clone()).await
222		}
223	};
224
225	let duration = start.elapsed();
226
227	// Separate frames from compute_duration
228	let (result, compute_duration) = match result {
229		Ok((frames, cd)) => (Ok(frames), cd),
230		Err(e) => (Err(e), duration),
231	};
232
233	// Post-execute interceptors
234	if let Some((identity, statements, params, metadata)) = response_parts {
235		let response_ctx = ResponseContext {
236			identity,
237			operation,
238			statements,
239			params,
240			metadata,
241			result: match &result {
242				Ok(frames) => Ok(frames.len()),
243				Err(e) => Err(e.to_string()),
244			},
245			duration,
246			compute_duration,
247		};
248		chain.post_execute(&response_ctx).await;
249	}
250
251	result.map(|frames| (frames, duration))
252}