Skip to main content

reifydb_sub_server/
execute.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4//! Query and command execution with interceptor support.
5//!
6//! All execution goes through [`execute`], which runs pre/post interceptor
7//! hooks around the actual engine dispatch. When no interceptors are
8//! registered the overhead is a single `is_empty()` check.
9
10use std::{error, fmt, sync::Arc, time::Duration};
11
12use reifydb_engine::engine::StandardEngine;
13use reifydb_runtime::{actor::system::ActorSystem, context::clock::Clock};
14use reifydb_type::{
15	error::{Diagnostic, Error},
16	params::Params,
17	value::{frame::frame::Frame, identity::IdentityId},
18};
19use tokio::time;
20use tracing::warn;
21
22use crate::interceptor::{Operation, RequestContext, RequestInterceptorChain, ResponseContext};
23
24/// Error types for query/command execution.
25#[derive(Debug)]
26pub enum ExecuteError {
27	/// Query exceeded the configured timeout.
28	Timeout,
29	/// Query was cancelled.
30	Cancelled,
31	/// Stream disconnected unexpectedly.
32	Disconnected,
33	/// Database engine returned an error with full diagnostic info.
34	Engine {
35		/// The full diagnostic with error code, source location, help text, etc.
36		diagnostic: Arc<Diagnostic>,
37		/// The statement that caused the error.
38		statement: String,
39	},
40	/// Request was rejected by a request interceptor.
41	Rejected {
42		/// Error code for the rejection (e.g. "AUTH_REQUIRED", "INSUFFICIENT_CREDITS").
43		code: String,
44		/// Human-readable reason for rejection.
45		message: String,
46	},
47}
48
49impl fmt::Display for ExecuteError {
50	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
51		match self {
52			ExecuteError::Timeout => write!(f, "Query execution timed out"),
53			ExecuteError::Cancelled => write!(f, "Query was cancelled"),
54			ExecuteError::Disconnected => write!(f, "Query stream disconnected unexpectedly"),
55			ExecuteError::Engine {
56				diagnostic,
57				..
58			} => write!(f, "Engine error: {}", diagnostic.message),
59			ExecuteError::Rejected {
60				code,
61				message,
62			} => write!(f, "Rejected [{}]: {}", code, message),
63		}
64	}
65}
66
67impl error::Error for ExecuteError {}
68
69impl From<Error> for ExecuteError {
70	fn from(err: Error) -> Self {
71		ExecuteError::Engine {
72			diagnostic: Arc::new(err.diagnostic()),
73			statement: String::new(),
74		}
75	}
76}
77
78/// Result type for execute operations.
79pub type ExecuteResult<T> = Result<T, ExecuteError>;
80
81/// Retry a closure up to 3 times on `TXN_001` transaction conflict errors.
82fn retry_on_conflict<F>(mut f: F) -> Result<Vec<Frame>, Error>
83where
84	F: FnMut() -> Result<Vec<Frame>, Error>,
85{
86	let mut last_err = None;
87	for attempt in 0..3u32 {
88		match f() {
89			Ok(frames) => return Ok(frames),
90			Err(err) if err.code == "TXN_001" => {
91				warn!(attempt = attempt + 1, "Transaction conflict detected, retrying");
92				last_err = Some(err);
93			}
94			Err(err) => return Err(err),
95		}
96	}
97	Err(last_err.unwrap())
98}
99
100async fn raw_query(
101	system: ActorSystem,
102	engine: StandardEngine,
103	query: String,
104	identity: IdentityId,
105	params: Params,
106	timeout: Duration,
107	clock: Clock,
108) -> ExecuteResult<(Vec<Frame>, Duration)> {
109	let task = system.execute(move || {
110		let t = clock.instant();
111		let r = engine.query_as(identity, &query, params);
112		(r, t.elapsed())
113	});
114	match time::timeout(timeout, task).await {
115		Err(_elapsed) => Err(ExecuteError::Timeout),
116		Ok(Ok((result, compute))) => result.map(|f| (f, compute)).map_err(ExecuteError::from),
117		Ok(Err(_join_error)) => Err(ExecuteError::Cancelled),
118	}
119}
120
121async fn raw_command(
122	system: ActorSystem,
123	engine: StandardEngine,
124	statements: String,
125	identity: IdentityId,
126	params: Params,
127	timeout: Duration,
128	clock: Clock,
129) -> ExecuteResult<(Vec<Frame>, Duration)> {
130	let task = system.execute(move || {
131		let t = clock.instant();
132		let r = retry_on_conflict(|| engine.command_as(identity, &statements, params.clone()));
133		(r, t.elapsed())
134	});
135	match time::timeout(timeout, task).await {
136		Err(_elapsed) => Err(ExecuteError::Timeout),
137		Ok(Ok((result, compute))) => result.map(|f| (f, compute)).map_err(ExecuteError::from),
138		Ok(Err(_join_error)) => Err(ExecuteError::Cancelled),
139	}
140}
141
142async fn raw_admin(
143	system: ActorSystem,
144	engine: StandardEngine,
145	statements: String,
146	identity: IdentityId,
147	params: Params,
148	timeout: Duration,
149	clock: Clock,
150) -> ExecuteResult<(Vec<Frame>, Duration)> {
151	let task = system.execute(move || {
152		let t = clock.instant();
153		let r = retry_on_conflict(|| engine.admin_as(identity, &statements, params.clone()));
154		(r, t.elapsed())
155	});
156	match time::timeout(timeout, task).await {
157		Err(_elapsed) => Err(ExecuteError::Timeout),
158		Ok(Ok((result, compute))) => result.map(|f| (f, compute)).map_err(ExecuteError::from),
159		Ok(Err(_join_error)) => Err(ExecuteError::Cancelled),
160	}
161}
162
163async fn raw_subscription(
164	system: ActorSystem,
165	engine: StandardEngine,
166	statement: String,
167	identity: IdentityId,
168	params: Params,
169	timeout: Duration,
170	clock: Clock,
171) -> ExecuteResult<(Vec<Frame>, Duration)> {
172	let task = system.execute(move || {
173		let t = clock.instant();
174		let r = retry_on_conflict(|| engine.subscription_as(identity, &statement, params.clone()));
175		(r, t.elapsed())
176	});
177	match time::timeout(timeout, task).await {
178		Err(_elapsed) => Err(ExecuteError::Timeout),
179		Ok(Ok((result, compute))) => result.map(|f| (f, compute)).map_err(ExecuteError::from),
180		Ok(Err(_join_error)) => Err(ExecuteError::Cancelled),
181	}
182}
183
184/// Execute a database operation with interceptor support.
185///
186/// This is the single entry point for all protocol handlers.
187/// Interceptors run before and after the engine dispatch:
188///
189/// 1. `pre_execute` — may reject the request or mutate identity/metadata
190/// 2. Engine dispatch (query / command / admin / subscribe)
191/// 3. `post_execute` — observes result and duration (fire-and-forget)
192///
193/// When the interceptor chain is empty, steps 1 and 3 are skipped.
194pub async fn execute(
195	chain: &RequestInterceptorChain,
196	system: ActorSystem,
197	engine: StandardEngine,
198	mut ctx: RequestContext,
199	timeout: Duration,
200	clock: &Clock,
201) -> ExecuteResult<(Vec<Frame>, Duration)> {
202	// Pre-execute interceptors (may reject, may mutate identity)
203	if !chain.is_empty() {
204		chain.pre_execute(&mut ctx).await?;
205	}
206
207	let start = clock.instant();
208
209	let operation = ctx.operation;
210	let combined = ctx.statements.join("; ");
211
212	// Clone params for response context only when interceptors need it
213	let response_parts = if !chain.is_empty() {
214		Some((ctx.identity, ctx.statements, ctx.params.clone(), ctx.metadata))
215	} else {
216		None
217	};
218
219	let result = match operation {
220		Operation::Query => {
221			raw_query(system, engine, combined, ctx.identity, ctx.params, timeout, clock.clone()).await
222		}
223		Operation::Command => {
224			raw_command(system, engine, combined, ctx.identity, ctx.params, timeout, clock.clone()).await
225		}
226		Operation::Admin => {
227			raw_admin(system, engine, combined, ctx.identity, ctx.params, timeout, clock.clone()).await
228		}
229		Operation::Subscribe => {
230			raw_subscription(system, engine, combined, ctx.identity, ctx.params, timeout, clock.clone())
231				.await
232		}
233	};
234
235	let duration = start.elapsed();
236
237	// Separate frames from compute_duration
238	let (result, compute_duration) = match result {
239		Ok((frames, cd)) => (Ok(frames), cd),
240		Err(e) => (Err(e), duration),
241	};
242
243	// Post-execute interceptors
244	if let Some((identity, statements, params, metadata)) = response_parts {
245		let response_ctx = ResponseContext {
246			identity,
247			operation,
248			statements,
249			params,
250			metadata,
251			result: match &result {
252				Ok(frames) => Ok(frames.len()),
253				Err(e) => Err(e.to_string()),
254			},
255			duration,
256			compute_duration,
257		};
258		chain.post_execute(&response_ctx).await;
259	}
260
261	result.map(|frames| (frames, duration))
262}