reifydb_sub_server/
dispatch.rs1#[cfg(not(reifydb_single_threaded))]
12pub use native::{dispatch, dispatch_subscribe};
13
14#[cfg(not(reifydb_single_threaded))]
15mod native {
16 use std::{sync::Arc, time::Duration};
17
18 use reifydb_core::{
19 actors::server::{ServerMessage, ServerResponse, ServerSubscribeResponse, build_server_message},
20 metric::ExecutionMetrics,
21 };
22 use reifydb_runtime::actor::reply::reply_channel;
23 use reifydb_type::value::{duration::Duration as ReifyDuration, frame::frame::Frame};
24 use tokio::time::timeout;
25
26 use crate::{
27 execute::ExecuteError,
28 interceptor::{RequestContext, ResponseContext},
29 state::AppState,
30 };
31
32 pub async fn dispatch(
41 state: &AppState,
42 mut ctx: RequestContext,
43 ) -> Result<(Vec<Frame>, Duration, ExecutionMetrics), ExecuteError> {
44 if !state.request_interceptors().is_empty() {
46 state.request_interceptors().pre_execute(&mut ctx).await?;
47 }
48
49 let start = state.clock().instant();
50
51 let (reply, receiver) = reply_channel();
53 let msg = build_server_message(ctx.operation, ctx.identity, ctx.rql.clone(), ctx.params.clone(), reply);
54
55 let (actor_ref, _handle) = state.spawn_server_actor();
56 actor_ref.send(msg).ok().ok_or(ExecuteError::Disconnected)?;
57
58 let server_response = timeout(state.query_timeout(), receiver.recv())
60 .await
61 .map_err(|_| ExecuteError::Timeout)?
62 .map_err(|_| ExecuteError::Disconnected)?;
63
64 let wall_duration = start.elapsed();
65 let (frames, compute_duration, metrics) = match server_response {
66 ServerResponse::Success {
67 frames,
68 duration,
69 metrics,
70 } => Ok((frames, duration, metrics)),
71 ServerResponse::EngineError {
72 diagnostic,
73 rql,
74 } => Err(ExecuteError::Engine {
75 diagnostic: Arc::from(diagnostic),
76 rql,
77 }),
78 }?;
79
80 if !state.request_interceptors().is_empty() {
82 let response_ctx = ResponseContext {
83 identity: ctx.identity,
84 operation: ctx.operation,
85 rql: ctx.rql,
86 params: ctx.params,
87 metadata: ctx.metadata,
88 metrics: metrics.clone(),
89 result: Ok(frames.len()),
90 total: ReifyDuration::from_nanoseconds(wall_duration.as_nanos() as i64)
91 .unwrap_or_default(),
92 compute: ReifyDuration::from_nanoseconds(compute_duration.as_nanos() as i64)
93 .unwrap_or_default(),
94 };
95 state.request_interceptors().post_execute(&response_ctx).await;
96 }
97
98 Ok((frames, wall_duration, metrics))
99 }
100
101 pub async fn dispatch_subscribe(
106 state: &AppState,
107 mut ctx: RequestContext,
108 ) -> Result<(Vec<Frame>, Duration, ExecutionMetrics), ExecuteError> {
109 if !state.request_interceptors().is_empty() {
111 state.request_interceptors().pre_execute(&mut ctx).await?;
112 }
113
114 let start = state.clock().instant();
115
116 let (reply, receiver) = reply_channel();
117 let msg = ServerMessage::Subscribe {
118 identity: ctx.identity,
119 rql: ctx.rql.clone(),
120 reply,
121 };
122
123 let (actor_ref, _handle) = state.spawn_server_actor();
124 actor_ref.send(msg).ok().ok_or(ExecuteError::Disconnected)?;
125
126 let response = timeout(state.query_timeout(), receiver.recv())
127 .await
128 .map_err(|_| ExecuteError::Timeout)?
129 .map_err(|_| ExecuteError::Disconnected)?;
130
131 let wall_duration = start.elapsed();
132
133 let (frames, compute_duration, metrics) = match response {
134 ServerSubscribeResponse::Subscribed {
135 frames,
136 duration,
137 metrics,
138 } => (frames, duration, metrics),
139 ServerSubscribeResponse::EngineError {
140 diagnostic,
141 rql,
142 } => {
143 return Err(ExecuteError::Engine {
144 diagnostic: Arc::from(diagnostic),
145 rql,
146 });
147 }
148 };
149
150 if !state.request_interceptors().is_empty() {
152 let response_ctx = ResponseContext {
153 identity: ctx.identity,
154 operation: ctx.operation,
155 rql: ctx.rql,
156 params: ctx.params,
157 metadata: ctx.metadata,
158 metrics: metrics.clone(),
159 result: Ok(frames.len()),
160 total: ReifyDuration::from_nanoseconds(wall_duration.as_nanos() as i64)
161 .unwrap_or_default(),
162 compute: ReifyDuration::from_nanoseconds(compute_duration.as_nanos() as i64)
163 .unwrap_or_default(),
164 };
165 state.request_interceptors().post_execute(&response_ctx).await;
166 }
167
168 Ok((frames, wall_duration, metrics))
169 }
170}