Skip to main content

reifydb_sub_server/
dispatch.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4//! Shared dispatch layer for all network transports.
5//!
6//! Pure helpers (`build_server_message`) live in `reifydb_core::actors::server`
7//! so both native transports and DST clients can share them.
8//! The async `dispatch()` / `dispatch_subscribe()` functions are the single
9//! entry points for native transport handlers.
10
11#[cfg(not(reifydb_single_threaded))]
12pub use native::{dispatch, dispatch_subscribe};
13
14#[cfg(not(reifydb_single_threaded))]
15mod native {
16	use std::sync::Arc;
17
18	use reifydb_core::{
19		actors::server::{ServerMessage, ServerResponse, ServerSubscribeResponse, build_server_message},
20		metric::ExecutionMetrics,
21	};
22	use reifydb_runtime::actor::reply::reply_channel;
23	use reifydb_type::value::{duration::Duration as ReifyDuration, frame::frame::Frame};
24	use tokio::time::timeout;
25
26	use crate::{
27		execute::ExecuteError,
28		interceptor::{RequestContext, ResponseContext},
29		state::AppState,
30	};
31
32	/// Dispatch a query/command/admin operation through the actor with interceptors.
33	///
34	/// This is the single entry point for all native transport handlers.
35	/// The caller is responsible only for:
36	/// 1. Extracting identity from transport-specific auth
37	/// 2. Building `RequestMetadata` from transport-specific headers
38	/// 3. Parsing params from transport-specific wire format
39	/// 4. Converting the result into transport-specific response format
40	pub async fn dispatch(
41		state: &AppState,
42		mut ctx: RequestContext,
43	) -> Result<(Vec<Frame>, ExecutionMetrics), ExecuteError> {
44		// Pre-execute interceptors
45		if !state.request_interceptors().is_empty() {
46			state.request_interceptors().pre_execute(&mut ctx).await?;
47		}
48
49		let start = state.clock().instant();
50
51		// Build message and send to per-request actor
52		let (reply, receiver) = reply_channel();
53		let msg = build_server_message(ctx.operation, ctx.identity, ctx.rql.clone(), ctx.params.clone(), reply);
54
55		let (actor_ref, _handle) = state.spawn_server_actor();
56		actor_ref.send(msg).ok().ok_or(ExecuteError::Disconnected)?;
57
58		// Await reply with timeout
59		let server_response = timeout(state.query_timeout(), receiver.recv())
60			.await
61			.map_err(|_| ExecuteError::Timeout)?
62			.map_err(|_| ExecuteError::Disconnected)?;
63
64		let wall_duration = start.elapsed();
65		let (frames, compute_duration, mut metrics) = match server_response {
66			ServerResponse::Success {
67				frames,
68				duration,
69				metrics,
70			} => Ok((frames, duration, metrics)),
71			ServerResponse::EngineError {
72				diagnostic,
73				rql,
74			} => Err(ExecuteError::Engine {
75				diagnostic: Arc::from(diagnostic),
76				rql,
77			}),
78		}?;
79
80		metrics.total = ReifyDuration::from_nanoseconds(wall_duration.as_nanos() as i64).unwrap_or_default();
81		metrics.compute =
82			ReifyDuration::from_nanoseconds(compute_duration.as_nanos() as i64).unwrap_or_default();
83
84		// Post-execute interceptors
85		if !state.request_interceptors().is_empty() {
86			let response_ctx = ResponseContext {
87				identity: ctx.identity,
88				operation: ctx.operation,
89				rql: ctx.rql,
90				params: ctx.params,
91				metadata: ctx.metadata,
92				metrics: metrics.clone(),
93				result: Ok(frames.len()),
94			};
95			state.request_interceptors().post_execute(&response_ctx).await;
96		}
97
98		Ok((frames, metrics))
99	}
100
101	/// Dispatch a subscribe operation through the actor with interceptors.
102	///
103	/// Separate from `dispatch()` because Subscribe uses `Reply<ServerSubscribeResponse>`
104	/// rather than `Reply<ServerResponse>`.
105	pub async fn dispatch_subscribe(
106		state: &AppState,
107		mut ctx: RequestContext,
108	) -> Result<(Vec<Frame>, ExecutionMetrics), ExecuteError> {
109		// Pre-execute interceptors
110		if !state.request_interceptors().is_empty() {
111			state.request_interceptors().pre_execute(&mut ctx).await?;
112		}
113
114		let start = state.clock().instant();
115
116		let (reply, receiver) = reply_channel();
117		let msg = ServerMessage::Subscribe {
118			identity: ctx.identity,
119			rql: ctx.rql.clone(),
120			reply,
121		};
122
123		let (actor_ref, _handle) = state.spawn_server_actor();
124		actor_ref.send(msg).ok().ok_or(ExecuteError::Disconnected)?;
125
126		let response = timeout(state.query_timeout(), receiver.recv())
127			.await
128			.map_err(|_| ExecuteError::Timeout)?
129			.map_err(|_| ExecuteError::Disconnected)?;
130
131		let wall_duration = start.elapsed();
132
133		let (frames, compute_duration, mut metrics) = match response {
134			ServerSubscribeResponse::Subscribed {
135				frames,
136				duration,
137				metrics,
138			} => (frames, duration, metrics),
139			ServerSubscribeResponse::EngineError {
140				diagnostic,
141				rql,
142			} => {
143				return Err(ExecuteError::Engine {
144					diagnostic: Arc::from(diagnostic),
145					rql,
146				});
147			}
148		};
149
150		metrics.total = ReifyDuration::from_nanoseconds(wall_duration.as_nanos() as i64).unwrap_or_default();
151		metrics.compute =
152			ReifyDuration::from_nanoseconds(compute_duration.as_nanos() as i64).unwrap_or_default();
153
154		// Post-execute interceptors
155		if !state.request_interceptors().is_empty() {
156			let response_ctx = ResponseContext {
157				identity: ctx.identity,
158				operation: ctx.operation,
159				rql: ctx.rql,
160				params: ctx.params,
161				metadata: ctx.metadata,
162				metrics: metrics.clone(),
163				result: Ok(frames.len()),
164			};
165			state.request_interceptors().post_execute(&response_ctx).await;
166		}
167
168		Ok((frames, metrics))
169	}
170}