Skip to main content

reifydb_sub_server/
dispatch.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4//! Shared dispatch layer for all network transports.
5//!
6//! Pure helpers (`build_server_message`) live in `reifydb_core::actors::server`
7//! so both native transports and DST clients can share them.
8//! The async `dispatch()` / `dispatch_subscribe()` functions are the single
9//! entry points for native transport handlers.
10
11#[cfg(not(reifydb_single_threaded))]
12pub use native::{dispatch, dispatch_subscribe};
13
14#[cfg(not(reifydb_single_threaded))]
15mod native {
16	use std::{sync::Arc, time::Duration};
17
18	use reifydb_core::{
19		actors::server::{ServerMessage, ServerResponse, ServerSubscribeResponse, build_server_message},
20		metric::ExecutionMetrics,
21	};
22	use reifydb_runtime::actor::reply::reply_channel;
23	use reifydb_type::value::{duration::Duration as ReifyDuration, frame::frame::Frame};
24	use tokio::time::timeout;
25
26	use crate::{
27		execute::ExecuteError,
28		interceptor::{RequestContext, ResponseContext},
29		state::AppState,
30	};
31
32	/// Dispatch a query/command/admin operation through the actor with interceptors.
33	///
34	/// This is the single entry point for all native transport handlers.
35	/// The caller is responsible only for:
36	/// 1. Extracting identity from transport-specific auth
37	/// 2. Building `RequestMetadata` from transport-specific headers
38	/// 3. Parsing params from transport-specific wire format
39	/// 4. Converting the result into transport-specific response format
40	pub async fn dispatch(
41		state: &AppState,
42		mut ctx: RequestContext,
43	) -> Result<(Vec<Frame>, Duration), ExecuteError> {
44		// Pre-execute interceptors
45		if !state.request_interceptors().is_empty() {
46			state.request_interceptors().pre_execute(&mut ctx).await?;
47		}
48
49		let start = state.clock().instant();
50
51		// Build message and send to per-request actor
52		let (reply, receiver) = reply_channel();
53		let msg = build_server_message(
54			ctx.operation,
55			ctx.identity,
56			ctx.statements.clone(),
57			ctx.params.clone(),
58			reply,
59		);
60
61		let (actor_ref, _handle) = state.spawn_server_actor();
62		actor_ref.send(msg).ok().ok_or(ExecuteError::Disconnected)?;
63
64		// Await reply with timeout
65		let server_response = timeout(state.query_timeout(), receiver.recv())
66			.await
67			.map_err(|_| ExecuteError::Timeout)?
68			.map_err(|_| ExecuteError::Disconnected)?;
69
70		let wall_duration = start.elapsed();
71		let (frames, compute_duration) = match server_response {
72			ServerResponse::Success {
73				frames,
74				duration,
75			} => Ok((frames, duration)),
76			ServerResponse::EngineError {
77				diagnostic,
78				statement,
79			} => Err(ExecuteError::Engine {
80				diagnostic: Arc::from(diagnostic),
81				statement,
82			}),
83		}?;
84
85		// Post-execute interceptors
86		if !state.request_interceptors().is_empty() {
87			let response_ctx = ResponseContext {
88				identity: ctx.identity,
89				operation: ctx.operation,
90				statements: ctx.statements,
91				params: ctx.params,
92				metadata: ctx.metadata,
93				metrics: ExecutionMetrics::default(),
94				result: Ok(frames.len()),
95				total: ReifyDuration::from_nanoseconds(wall_duration.as_nanos() as i64)
96					.unwrap_or_default(),
97				compute: ReifyDuration::from_nanoseconds(compute_duration.as_nanos() as i64)
98					.unwrap_or_default(),
99			};
100			state.request_interceptors().post_execute(&response_ctx).await;
101		}
102
103		Ok((frames, wall_duration))
104	}
105
106	/// Dispatch a subscribe operation through the actor with interceptors.
107	///
108	/// Separate from `dispatch()` because Subscribe uses `Reply<ServerSubscribeResponse>`
109	/// rather than `Reply<ServerResponse>`.
110	pub async fn dispatch_subscribe(
111		state: &AppState,
112		mut ctx: RequestContext,
113	) -> Result<(Vec<Frame>, Duration), ExecuteError> {
114		// Pre-execute interceptors
115		if !state.request_interceptors().is_empty() {
116			state.request_interceptors().pre_execute(&mut ctx).await?;
117		}
118
119		let start = state.clock().instant();
120
121		let (reply, receiver) = reply_channel();
122		let msg = ServerMessage::Subscribe {
123			identity: ctx.identity,
124			query: ctx.statements.join("; "),
125			reply,
126		};
127
128		let (actor_ref, _handle) = state.spawn_server_actor();
129		actor_ref.send(msg).ok().ok_or(ExecuteError::Disconnected)?;
130
131		let response = timeout(state.query_timeout(), receiver.recv())
132			.await
133			.map_err(|_| ExecuteError::Timeout)?
134			.map_err(|_| ExecuteError::Disconnected)?;
135
136		let wall_duration = start.elapsed();
137
138		let (frames, compute_duration) = match response {
139			ServerSubscribeResponse::Subscribed {
140				frames,
141				duration,
142			} => (frames, duration),
143			ServerSubscribeResponse::EngineError {
144				diagnostic,
145				statement,
146			} => {
147				return Err(ExecuteError::Engine {
148					diagnostic: Arc::from(diagnostic),
149					statement,
150				});
151			}
152		};
153
154		// Post-execute interceptors
155		if !state.request_interceptors().is_empty() {
156			let response_ctx = ResponseContext {
157				identity: ctx.identity,
158				operation: ctx.operation,
159				statements: ctx.statements,
160				params: ctx.params,
161				metadata: ctx.metadata,
162				metrics: ExecutionMetrics::default(),
163				result: Ok(frames.len()),
164				total: ReifyDuration::from_nanoseconds(wall_duration.as_nanos() as i64)
165					.unwrap_or_default(),
166				compute: ReifyDuration::from_nanoseconds(compute_duration.as_nanos() as i64)
167					.unwrap_or_default(),
168			};
169			state.request_interceptors().post_execute(&response_ctx).await;
170		}
171
172		Ok((frames, wall_duration))
173	}
174}