reifydb_sub_server/
dispatch.rs1#[cfg(not(reifydb_single_threaded))]
12pub use native::{dispatch, dispatch_subscribe};
13
14#[cfg(not(reifydb_single_threaded))]
15mod native {
16 use std::sync::Arc;
17
18 use reifydb_core::{
19 actors::server::{ServerMessage, ServerResponse, ServerSubscribeResponse, build_server_message},
20 metric::ExecutionMetrics,
21 };
22 use reifydb_runtime::actor::reply::reply_channel;
23 use reifydb_type::value::{duration::Duration as ReifyDuration, frame::frame::Frame};
24 use tokio::time::timeout;
25
26 use crate::{
27 execute::ExecuteError,
28 interceptor::{RequestContext, ResponseContext},
29 state::AppState,
30 };
31
32 pub async fn dispatch(
41 state: &AppState,
42 mut ctx: RequestContext,
43 ) -> Result<(Vec<Frame>, ExecutionMetrics), ExecuteError> {
44 if !state.request_interceptors().is_empty() {
46 state.request_interceptors().pre_execute(&mut ctx).await?;
47 }
48
49 let start = state.clock().instant();
50
51 let (reply, receiver) = reply_channel();
53 let msg = build_server_message(ctx.operation, ctx.identity, ctx.rql.clone(), ctx.params.clone(), reply);
54
55 let (actor_ref, _handle) = state.spawn_server_actor();
56 actor_ref.send(msg).ok().ok_or(ExecuteError::Disconnected)?;
57
58 let server_response = timeout(state.query_timeout(), receiver.recv())
60 .await
61 .map_err(|_| ExecuteError::Timeout)?
62 .map_err(|_| ExecuteError::Disconnected)?;
63
64 let wall_duration = start.elapsed();
65 let (frames, compute_duration, mut metrics) = match server_response {
66 ServerResponse::Success {
67 frames,
68 duration,
69 metrics,
70 } => Ok((frames, duration, metrics)),
71 ServerResponse::EngineError {
72 diagnostic,
73 rql,
74 } => Err(ExecuteError::Engine {
75 diagnostic: Arc::from(diagnostic),
76 rql,
77 }),
78 }?;
79
80 metrics.total = ReifyDuration::from_nanoseconds(wall_duration.as_nanos() as i64).unwrap_or_default();
81 metrics.compute =
82 ReifyDuration::from_nanoseconds(compute_duration.as_nanos() as i64).unwrap_or_default();
83
84 if !state.request_interceptors().is_empty() {
86 let response_ctx = ResponseContext {
87 identity: ctx.identity,
88 operation: ctx.operation,
89 rql: ctx.rql,
90 params: ctx.params,
91 metadata: ctx.metadata,
92 metrics: metrics.clone(),
93 result: Ok(frames.len()),
94 };
95 state.request_interceptors().post_execute(&response_ctx).await;
96 }
97
98 Ok((frames, metrics))
99 }
100
101 pub async fn dispatch_subscribe(
106 state: &AppState,
107 mut ctx: RequestContext,
108 ) -> Result<(Vec<Frame>, ExecutionMetrics), ExecuteError> {
109 if !state.request_interceptors().is_empty() {
111 state.request_interceptors().pre_execute(&mut ctx).await?;
112 }
113
114 let start = state.clock().instant();
115
116 let (reply, receiver) = reply_channel();
117 let msg = ServerMessage::Subscribe {
118 identity: ctx.identity,
119 rql: ctx.rql.clone(),
120 reply,
121 };
122
123 let (actor_ref, _handle) = state.spawn_server_actor();
124 actor_ref.send(msg).ok().ok_or(ExecuteError::Disconnected)?;
125
126 let response = timeout(state.query_timeout(), receiver.recv())
127 .await
128 .map_err(|_| ExecuteError::Timeout)?
129 .map_err(|_| ExecuteError::Disconnected)?;
130
131 let wall_duration = start.elapsed();
132
133 let (frames, compute_duration, mut metrics) = match response {
134 ServerSubscribeResponse::Subscribed {
135 frames,
136 duration,
137 metrics,
138 } => (frames, duration, metrics),
139 ServerSubscribeResponse::EngineError {
140 diagnostic,
141 rql,
142 } => {
143 return Err(ExecuteError::Engine {
144 diagnostic: Arc::from(diagnostic),
145 rql,
146 });
147 }
148 };
149
150 metrics.total = ReifyDuration::from_nanoseconds(wall_duration.as_nanos() as i64).unwrap_or_default();
151 metrics.compute =
152 ReifyDuration::from_nanoseconds(compute_duration.as_nanos() as i64).unwrap_or_default();
153
154 if !state.request_interceptors().is_empty() {
156 let response_ctx = ResponseContext {
157 identity: ctx.identity,
158 operation: ctx.operation,
159 rql: ctx.rql,
160 params: ctx.params,
161 metadata: ctx.metadata,
162 metrics: metrics.clone(),
163 result: Ok(frames.len()),
164 };
165 state.request_interceptors().post_execute(&response_ctx).await;
166 }
167
168 Ok((frames, metrics))
169 }
170}