1use std::{error, fmt, sync::Arc, time::Duration};
11
12use reifydb_engine::engine::StandardEngine;
13use reifydb_runtime::{actor::system::ActorSystem, context::clock::Clock};
14use reifydb_type::{
15 error::{Diagnostic, Error},
16 params::Params,
17 value::{frame::frame::Frame, identity::IdentityId},
18};
19use tokio::time;
20use tracing::warn;
21
22use crate::interceptor::{Operation, RequestContext, RequestInterceptorChain, ResponseContext};
23
24#[derive(Debug)]
26pub enum ExecuteError {
27 Timeout,
29 Cancelled,
31 Disconnected,
33 Engine {
35 diagnostic: Arc<Diagnostic>,
37 statement: String,
39 },
40 Rejected {
42 code: String,
44 message: String,
46 },
47}
48
49impl fmt::Display for ExecuteError {
50 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
51 match self {
52 ExecuteError::Timeout => write!(f, "Query execution timed out"),
53 ExecuteError::Cancelled => write!(f, "Query was cancelled"),
54 ExecuteError::Disconnected => write!(f, "Query stream disconnected unexpectedly"),
55 ExecuteError::Engine {
56 diagnostic,
57 ..
58 } => write!(f, "Engine error: {}", diagnostic.message),
59 ExecuteError::Rejected {
60 code,
61 message,
62 } => write!(f, "Rejected [{}]: {}", code, message),
63 }
64 }
65}
66
67impl error::Error for ExecuteError {}
68
69impl From<Error> for ExecuteError {
70 fn from(err: Error) -> Self {
71 ExecuteError::Engine {
72 diagnostic: Arc::new(err.diagnostic()),
73 statement: String::new(),
74 }
75 }
76}
77
78pub type ExecuteResult<T> = Result<T, ExecuteError>;
80
81fn retry_on_conflict<F>(mut f: F) -> Result<Vec<Frame>, Error>
83where
84 F: FnMut() -> Result<Vec<Frame>, Error>,
85{
86 let mut last_err = None;
87 for attempt in 0..3u32 {
88 match f() {
89 Ok(frames) => return Ok(frames),
90 Err(err) if err.code == "TXN_001" => {
91 warn!(attempt = attempt + 1, "Transaction conflict detected, retrying");
92 last_err = Some(err);
93 }
94 Err(err) => return Err(err),
95 }
96 }
97 Err(last_err.unwrap())
98}
99
100async fn raw_query(
101 system: ActorSystem,
102 engine: StandardEngine,
103 query: String,
104 identity: IdentityId,
105 params: Params,
106 timeout: Duration,
107 clock: Clock,
108) -> ExecuteResult<(Vec<Frame>, Duration)> {
109 let task = system.execute(move || {
110 let t = clock.instant();
111 let r = engine.query_as(identity, &query, params);
112 (r, t.elapsed())
113 });
114 match time::timeout(timeout, task).await {
115 Err(_elapsed) => Err(ExecuteError::Timeout),
116 Ok(Ok((result, compute))) => result.map(|f| (f, compute)).map_err(ExecuteError::from),
117 Ok(Err(_join_error)) => Err(ExecuteError::Cancelled),
118 }
119}
120
121async fn raw_command(
122 system: ActorSystem,
123 engine: StandardEngine,
124 statements: String,
125 identity: IdentityId,
126 params: Params,
127 timeout: Duration,
128 clock: Clock,
129) -> ExecuteResult<(Vec<Frame>, Duration)> {
130 let task = system.execute(move || {
131 let t = clock.instant();
132 let r = retry_on_conflict(|| engine.command_as(identity, &statements, params.clone()));
133 (r, t.elapsed())
134 });
135 match time::timeout(timeout, task).await {
136 Err(_elapsed) => Err(ExecuteError::Timeout),
137 Ok(Ok((result, compute))) => result.map(|f| (f, compute)).map_err(ExecuteError::from),
138 Ok(Err(_join_error)) => Err(ExecuteError::Cancelled),
139 }
140}
141
142async fn raw_admin(
143 system: ActorSystem,
144 engine: StandardEngine,
145 statements: String,
146 identity: IdentityId,
147 params: Params,
148 timeout: Duration,
149 clock: Clock,
150) -> ExecuteResult<(Vec<Frame>, Duration)> {
151 let task = system.execute(move || {
152 let t = clock.instant();
153 let r = retry_on_conflict(|| engine.admin_as(identity, &statements, params.clone()));
154 (r, t.elapsed())
155 });
156 match time::timeout(timeout, task).await {
157 Err(_elapsed) => Err(ExecuteError::Timeout),
158 Ok(Ok((result, compute))) => result.map(|f| (f, compute)).map_err(ExecuteError::from),
159 Ok(Err(_join_error)) => Err(ExecuteError::Cancelled),
160 }
161}
162
163async fn raw_subscription(
164 system: ActorSystem,
165 engine: StandardEngine,
166 statement: String,
167 identity: IdentityId,
168 params: Params,
169 timeout: Duration,
170 clock: Clock,
171) -> ExecuteResult<(Vec<Frame>, Duration)> {
172 let task = system.execute(move || {
173 let t = clock.instant();
174 let r = retry_on_conflict(|| engine.subscription_as(identity, &statement, params.clone()));
175 (r, t.elapsed())
176 });
177 match time::timeout(timeout, task).await {
178 Err(_elapsed) => Err(ExecuteError::Timeout),
179 Ok(Ok((result, compute))) => result.map(|f| (f, compute)).map_err(ExecuteError::from),
180 Ok(Err(_join_error)) => Err(ExecuteError::Cancelled),
181 }
182}
183
184pub async fn execute(
195 chain: &RequestInterceptorChain,
196 system: ActorSystem,
197 engine: StandardEngine,
198 mut ctx: RequestContext,
199 timeout: Duration,
200 clock: &Clock,
201) -> ExecuteResult<(Vec<Frame>, Duration)> {
202 if !chain.is_empty() {
204 chain.pre_execute(&mut ctx).await?;
205 }
206
207 let start = clock.instant();
208
209 let operation = ctx.operation;
210 let combined = ctx.statements.join("; ");
211
212 let response_parts = if !chain.is_empty() {
214 Some((ctx.identity, ctx.statements, ctx.params.clone(), ctx.metadata))
215 } else {
216 None
217 };
218
219 let result = match operation {
220 Operation::Query => {
221 raw_query(system, engine, combined, ctx.identity, ctx.params, timeout, clock.clone()).await
222 }
223 Operation::Command => {
224 raw_command(system, engine, combined, ctx.identity, ctx.params, timeout, clock.clone()).await
225 }
226 Operation::Admin => {
227 raw_admin(system, engine, combined, ctx.identity, ctx.params, timeout, clock.clone()).await
228 }
229 Operation::Subscribe => {
230 raw_subscription(system, engine, combined, ctx.identity, ctx.params, timeout, clock.clone())
231 .await
232 }
233 };
234
235 let duration = start.elapsed();
236
237 let (result, compute_duration) = match result {
239 Ok((frames, cd)) => (Ok(frames), cd),
240 Err(e) => (Err(e), duration),
241 };
242
243 if let Some((identity, statements, params, metadata)) = response_parts {
245 let response_ctx = ResponseContext {
246 identity,
247 operation,
248 statements,
249 params,
250 metadata,
251 result: match &result {
252 Ok(frames) => Ok(frames.len()),
253 Err(e) => Err(e.to_string()),
254 },
255 duration,
256 compute_duration,
257 };
258 chain.post_execute(&response_ctx).await;
259 }
260
261 result.map(|frames| (frames, duration))
262}