1use std::{error, fmt, sync::Arc, time::Duration};
11
12use reifydb_engine::engine::StandardEngine;
13use reifydb_runtime::context::clock::Clock;
14use reifydb_type::{
15 error::{Diagnostic, Error},
16 params::Params,
17 value::{frame::frame::Frame, identity::IdentityId},
18};
19use tokio::{task::spawn_blocking, time};
20use tracing::warn;
21
22use crate::interceptor::{Operation, RequestContext, RequestInterceptorChain, ResponseContext};
23
24#[derive(Debug)]
26pub enum ExecuteError {
27 Timeout,
29 Cancelled,
31 Disconnected,
33 Engine {
35 diagnostic: Arc<Diagnostic>,
37 statement: String,
39 },
40 Rejected {
42 code: String,
44 message: String,
46 },
47}
48
49impl fmt::Display for ExecuteError {
50 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
51 match self {
52 ExecuteError::Timeout => write!(f, "Query execution timed out"),
53 ExecuteError::Cancelled => write!(f, "Query was cancelled"),
54 ExecuteError::Disconnected => write!(f, "Query stream disconnected unexpectedly"),
55 ExecuteError::Engine {
56 diagnostic,
57 ..
58 } => write!(f, "Engine error: {}", diagnostic.message),
59 ExecuteError::Rejected {
60 code,
61 message,
62 } => write!(f, "Rejected [{}]: {}", code, message),
63 }
64 }
65}
66
67impl error::Error for ExecuteError {}
68
69impl From<Error> for ExecuteError {
70 fn from(err: Error) -> Self {
71 ExecuteError::Engine {
72 diagnostic: Arc::new(err.diagnostic()),
73 statement: String::new(),
74 }
75 }
76}
77
78pub type ExecuteResult<T> = Result<T, ExecuteError>;
80
81fn retry_on_conflict<F>(mut f: F) -> Result<Vec<Frame>, Error>
83where
84 F: FnMut() -> Result<Vec<Frame>, Error>,
85{
86 let mut last_err = None;
87 for attempt in 0..3u32 {
88 match f() {
89 Ok(frames) => return Ok(frames),
90 Err(err) if err.code == "TXN_001" => {
91 warn!(attempt = attempt + 1, "Transaction conflict detected, retrying");
92 last_err = Some(err);
93 }
94 Err(err) => return Err(err),
95 }
96 }
97 Err(last_err.unwrap())
98}
99
100async fn raw_query(
101 engine: StandardEngine,
102 query: String,
103 identity: IdentityId,
104 params: Params,
105 timeout: Duration,
106 clock: Clock,
107) -> ExecuteResult<(Vec<Frame>, Duration)> {
108 let task = spawn_blocking(move || -> (Result<Vec<Frame>, Error>, Duration) {
109 let t = clock.instant();
110 let r = engine.query_as(identity, &query, params);
111 (r, t.elapsed())
112 });
113 match time::timeout(timeout, task).await {
114 Err(_elapsed) => Err(ExecuteError::Timeout),
115 Ok(Ok((result, compute))) => result.map(|f| (f, compute)).map_err(ExecuteError::from),
116 Ok(Err(_join_error)) => Err(ExecuteError::Cancelled),
117 }
118}
119
120async fn raw_command(
121 engine: StandardEngine,
122 statements: String,
123 identity: IdentityId,
124 params: Params,
125 timeout: Duration,
126 clock: Clock,
127) -> ExecuteResult<(Vec<Frame>, Duration)> {
128 let task = spawn_blocking(move || -> (Result<Vec<Frame>, Error>, Duration) {
129 let t = clock.instant();
130 let r = retry_on_conflict(|| engine.command_as(identity, &statements, params.clone()));
131 (r, t.elapsed())
132 });
133 match time::timeout(timeout, task).await {
134 Err(_elapsed) => Err(ExecuteError::Timeout),
135 Ok(Ok((result, compute))) => result.map(|f| (f, compute)).map_err(ExecuteError::from),
136 Ok(Err(_join_error)) => Err(ExecuteError::Cancelled),
137 }
138}
139
140async fn raw_admin(
141 engine: StandardEngine,
142 statements: String,
143 identity: IdentityId,
144 params: Params,
145 timeout: Duration,
146 clock: Clock,
147) -> ExecuteResult<(Vec<Frame>, Duration)> {
148 let task = spawn_blocking(move || -> (Result<Vec<Frame>, Error>, Duration) {
149 let t = clock.instant();
150 let r = retry_on_conflict(|| engine.admin_as(identity, &statements, params.clone()));
151 (r, t.elapsed())
152 });
153 match time::timeout(timeout, task).await {
154 Err(_elapsed) => Err(ExecuteError::Timeout),
155 Ok(Ok((result, compute))) => result.map(|f| (f, compute)).map_err(ExecuteError::from),
156 Ok(Err(_join_error)) => Err(ExecuteError::Cancelled),
157 }
158}
159
160async fn raw_subscription(
161 engine: StandardEngine,
162 statement: String,
163 identity: IdentityId,
164 params: Params,
165 timeout: Duration,
166 clock: Clock,
167) -> ExecuteResult<(Vec<Frame>, Duration)> {
168 let task = spawn_blocking(move || -> (Result<Vec<Frame>, Error>, Duration) {
169 let t = clock.instant();
170 let r = retry_on_conflict(|| engine.subscribe_as(identity, &statement, params.clone()));
171 (r, t.elapsed())
172 });
173 match time::timeout(timeout, task).await {
174 Err(_elapsed) => Err(ExecuteError::Timeout),
175 Ok(Ok((result, compute))) => result.map(|f| (f, compute)).map_err(ExecuteError::from),
176 Ok(Err(_join_error)) => Err(ExecuteError::Cancelled),
177 }
178}
179
180pub async fn execute(
191 chain: &RequestInterceptorChain,
192 engine: StandardEngine,
193 mut ctx: RequestContext,
194 timeout: Duration,
195 clock: &Clock,
196) -> ExecuteResult<(Vec<Frame>, Duration)> {
197 if !chain.is_empty() {
199 chain.pre_execute(&mut ctx).await?;
200 }
201
202 let start = clock.instant();
203
204 let operation = ctx.operation;
205 let combined = ctx.statements.join("; ");
206
207 let response_parts = if !chain.is_empty() {
209 Some((ctx.identity, ctx.statements, ctx.params.clone(), ctx.metadata))
210 } else {
211 None
212 };
213
214 let result = match operation {
215 Operation::Query => raw_query(engine, combined, ctx.identity, ctx.params, timeout, clock.clone()).await,
216 Operation::Command => {
217 raw_command(engine, combined, ctx.identity, ctx.params, timeout, clock.clone()).await
218 }
219 Operation::Admin => raw_admin(engine, combined, ctx.identity, ctx.params, timeout, clock.clone()).await,
220 Operation::Subscribe => {
221 raw_subscription(engine, combined, ctx.identity, ctx.params, timeout, clock.clone()).await
222 }
223 };
224
225 let duration = start.elapsed();
226
227 let (result, compute_duration) = match result {
229 Ok((frames, cd)) => (Ok(frames), cd),
230 Err(e) => (Err(e), duration),
231 };
232
233 if let Some((identity, statements, params, metadata)) = response_parts {
235 let response_ctx = ResponseContext {
236 identity,
237 operation,
238 statements,
239 params,
240 metadata,
241 result: match &result {
242 Ok(frames) => Ok(frames.len()),
243 Err(e) => Err(e.to_string()),
244 },
245 duration,
246 compute_duration,
247 };
248 chain.post_execute(&response_ctx).await;
249 }
250
251 result.map(|frames| (frames, duration))
252}