reifydb_sub_server/
dispatch.rs1#[cfg(not(reifydb_single_threaded))]
12pub use native::{dispatch, dispatch_subscribe};
13
14#[cfg(not(reifydb_single_threaded))]
15mod native {
16 use std::{sync::Arc, time::Duration};
17
18 use reifydb_core::{
19 actors::server::{ServerMessage, ServerResponse, ServerSubscribeResponse, build_server_message},
20 metric::ExecutionMetrics,
21 };
22 use reifydb_runtime::actor::reply::reply_channel;
23 use reifydb_type::value::{duration::Duration as ReifyDuration, frame::frame::Frame};
24 use tokio::time::timeout;
25
26 use crate::{
27 execute::ExecuteError,
28 interceptor::{RequestContext, ResponseContext},
29 state::AppState,
30 };
31
32 pub async fn dispatch(
41 state: &AppState,
42 mut ctx: RequestContext,
43 ) -> Result<(Vec<Frame>, Duration), ExecuteError> {
44 if !state.request_interceptors().is_empty() {
46 state.request_interceptors().pre_execute(&mut ctx).await?;
47 }
48
49 let start = state.clock().instant();
50
51 let (reply, receiver) = reply_channel();
53 let msg = build_server_message(
54 ctx.operation,
55 ctx.identity,
56 ctx.statements.clone(),
57 ctx.params.clone(),
58 reply,
59 );
60
61 let (actor_ref, _handle) = state.spawn_server_actor();
62 actor_ref.send(msg).ok().ok_or(ExecuteError::Disconnected)?;
63
64 let server_response = timeout(state.query_timeout(), receiver.recv())
66 .await
67 .map_err(|_| ExecuteError::Timeout)?
68 .map_err(|_| ExecuteError::Disconnected)?;
69
70 let wall_duration = start.elapsed();
71 let (frames, compute_duration) = match server_response {
72 ServerResponse::Success {
73 frames,
74 duration,
75 } => Ok((frames, duration)),
76 ServerResponse::EngineError {
77 diagnostic,
78 statement,
79 } => Err(ExecuteError::Engine {
80 diagnostic: Arc::from(diagnostic),
81 statement,
82 }),
83 }?;
84
85 if !state.request_interceptors().is_empty() {
87 let response_ctx = ResponseContext {
88 identity: ctx.identity,
89 operation: ctx.operation,
90 statements: ctx.statements,
91 params: ctx.params,
92 metadata: ctx.metadata,
93 metrics: ExecutionMetrics::default(),
94 result: Ok(frames.len()),
95 total: ReifyDuration::from_nanoseconds(wall_duration.as_nanos() as i64)
96 .unwrap_or_default(),
97 compute: ReifyDuration::from_nanoseconds(compute_duration.as_nanos() as i64)
98 .unwrap_or_default(),
99 };
100 state.request_interceptors().post_execute(&response_ctx).await;
101 }
102
103 Ok((frames, wall_duration))
104 }
105
106 pub async fn dispatch_subscribe(
111 state: &AppState,
112 mut ctx: RequestContext,
113 ) -> Result<(Vec<Frame>, Duration), ExecuteError> {
114 if !state.request_interceptors().is_empty() {
116 state.request_interceptors().pre_execute(&mut ctx).await?;
117 }
118
119 let start = state.clock().instant();
120
121 let (reply, receiver) = reply_channel();
122 let msg = ServerMessage::Subscribe {
123 identity: ctx.identity,
124 query: ctx.statements.join("; "),
125 reply,
126 };
127
128 let (actor_ref, _handle) = state.spawn_server_actor();
129 actor_ref.send(msg).ok().ok_or(ExecuteError::Disconnected)?;
130
131 let response = timeout(state.query_timeout(), receiver.recv())
132 .await
133 .map_err(|_| ExecuteError::Timeout)?
134 .map_err(|_| ExecuteError::Disconnected)?;
135
136 let wall_duration = start.elapsed();
137
138 let (frames, compute_duration) = match response {
139 ServerSubscribeResponse::Subscribed {
140 frames,
141 duration,
142 } => (frames, duration),
143 ServerSubscribeResponse::EngineError {
144 diagnostic,
145 statement,
146 } => {
147 return Err(ExecuteError::Engine {
148 diagnostic: Arc::from(diagnostic),
149 statement,
150 });
151 }
152 };
153
154 if !state.request_interceptors().is_empty() {
156 let response_ctx = ResponseContext {
157 identity: ctx.identity,
158 operation: ctx.operation,
159 statements: ctx.statements,
160 params: ctx.params,
161 metadata: ctx.metadata,
162 metrics: ExecutionMetrics::default(),
163 result: Ok(frames.len()),
164 total: ReifyDuration::from_nanoseconds(wall_duration.as_nanos() as i64)
165 .unwrap_or_default(),
166 compute: ReifyDuration::from_nanoseconds(compute_duration.as_nanos() as i64)
167 .unwrap_or_default(),
168 };
169 state.request_interceptors().post_execute(&response_ctx).await;
170 }
171
172 Ok((frames, wall_duration))
173 }
174}