Skip to main content

reifydb_sub_server/
state.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4//! Application state shared across request handler.
5//!
6//! This module provides the shared state that is passed to all HTTP and WebSocket
7//! handler, including the database engine and query configuration.
8
9use std::time::Duration;
10
11use reifydb_auth::service::AuthService;
12use reifydb_core::actors::server::ServerMessage;
13use reifydb_engine::engine::StandardEngine;
14use reifydb_runtime::{
15	actor::{
16		mailbox::ActorRef,
17		system::{ActorHandle, ActorSystem},
18	},
19	context::{clock::Clock, rng::Rng},
20};
21
22use crate::{actor::ServerActor, interceptor::RequestInterceptorChain};
23
24/// Configuration for query execution.
25#[derive(Debug, Clone)]
26pub struct StateConfig {
27	/// Timeout for individual query execution.
28	/// If a query takes longer than this, it will be cancelled.
29	pub query_timeout: Duration,
30	/// Timeout for entire HTTP request lifecycle.
31	/// This includes reading the request, executing the query, and writing the response.
32	pub request_timeout: Duration,
33	/// Maximum concurrent connections allowed.
34	/// New connections beyond this limit will be rejected.
35	pub max_connections: usize,
36	/// Whether admin (DDL) operations are enabled on this listener.
37	pub admin_enabled: bool,
38}
39
40impl Default for StateConfig {
41	fn default() -> Self {
42		Self {
43			query_timeout: Duration::from_secs(30),
44			request_timeout: Duration::from_secs(60),
45			max_connections: 10_000,
46			admin_enabled: false,
47		}
48	}
49}
50
51impl StateConfig {
52	/// Create a new QueryConfig with default values.
53	pub fn new() -> Self {
54		Self::default()
55	}
56
57	/// Set the query timeout.
58	pub fn query_timeout(mut self, timeout: Duration) -> Self {
59		self.query_timeout = timeout;
60		self
61	}
62
63	/// Set the request timeout.
64	pub fn request_timeout(mut self, timeout: Duration) -> Self {
65		self.request_timeout = timeout;
66		self
67	}
68
69	/// Set the maximum connections.
70	pub fn max_connections(mut self, max: usize) -> Self {
71		self.max_connections = max;
72		self
73	}
74
75	/// Set whether admin operations are enabled.
76	pub fn admin_enabled(mut self, enabled: bool) -> Self {
77		self.admin_enabled = enabled;
78		self
79	}
80}
81
82/// Shared application state passed to all request handler.
83///
84/// This struct is cloneable and cheap to clone since `StandardEngine` uses
85/// `Arc` internally. Each handler receives a clone of this state.
86///
87/// # Example
88///
89/// ```ignore
90/// let state = AppState::new(actor_system, engine, QueryConfig::default(), interceptors);
91///
92/// // In an axum handler:
93/// async fn handle_query(State(state): State<AppState>, ...) {
94///     let system = state.actor_system();
95///     let engine = state.engine();
96///     // ...
97/// }
98/// ```
99#[derive(Clone)]
100pub struct AppState {
101	actor_system: ActorSystem,
102	engine: StandardEngine,
103	auth_service: AuthService,
104	config: StateConfig,
105	request_interceptors: RequestInterceptorChain,
106	clock: Clock,
107	rng: Rng,
108}
109
110impl AppState {
111	/// Create a new AppState with the given actor system, engine, configuration,
112	/// and request interceptor chain.
113	pub fn new(
114		actor_system: ActorSystem,
115		engine: StandardEngine,
116		auth_service: AuthService,
117		config: StateConfig,
118		request_interceptors: RequestInterceptorChain,
119		clock: Clock,
120		rng: Rng,
121	) -> Self {
122		Self {
123			actor_system,
124			engine,
125			auth_service,
126			config,
127			request_interceptors,
128			clock,
129			rng,
130		}
131	}
132
133	/// Clone this state with a different configuration, preserving the
134	/// interceptor chain and other shared resources.
135	pub fn clone_with_config(&self, config: StateConfig) -> Self {
136		Self {
137			actor_system: self.actor_system.clone(),
138			engine: self.engine.clone(),
139			auth_service: self.auth_service.clone(),
140			config,
141			request_interceptors: self.request_interceptors.clone(),
142			clock: self.clock.clone(),
143			rng: self.rng.clone(),
144		}
145	}
146
147	/// Get a clone of the actor system.
148	///
149	/// This is cheap since `ActorSystem` uses `Arc` internally.
150	#[inline]
151	pub fn actor_system(&self) -> ActorSystem {
152		self.actor_system.clone()
153	}
154
155	/// Get a reference to the database engine.
156	#[inline]
157	pub fn engine(&self) -> &StandardEngine {
158		&self.engine
159	}
160
161	/// Get a clone of the database engine.
162	///
163	/// This is cheap since `StandardEngine` uses `Arc` internally.
164	#[inline]
165	pub fn engine_clone(&self) -> StandardEngine {
166		self.engine.clone()
167	}
168
169	/// Get a reference to the query configuration.
170	#[inline]
171	pub fn config(&self) -> &StateConfig {
172		&self.config
173	}
174
175	/// Get the query timeout from configuration.
176	#[inline]
177	pub fn query_timeout(&self) -> Duration {
178		self.config.query_timeout
179	}
180
181	/// Get the request timeout from configuration.
182	#[inline]
183	pub fn request_timeout(&self) -> Duration {
184		self.config.request_timeout
185	}
186
187	/// Get the maximum connections from configuration.
188	#[inline]
189	pub fn max_connections(&self) -> usize {
190		self.config.max_connections
191	}
192
193	/// Get whether admin operations are enabled.
194	#[inline]
195	pub fn admin_enabled(&self) -> bool {
196		self.config.admin_enabled
197	}
198
199	/// Get a reference to the request interceptor chain.
200	#[inline]
201	pub fn request_interceptors(&self) -> &RequestInterceptorChain {
202		&self.request_interceptors
203	}
204
205	/// Get a reference to the clock.
206	#[inline]
207	pub fn clock(&self) -> &Clock {
208		&self.clock
209	}
210
211	/// Get a reference to the RNG.
212	#[inline]
213	pub fn rng(&self) -> &Rng {
214		&self.rng
215	}
216
217	/// Get a reference to the authentication service.
218	#[inline]
219	pub fn auth_service(&self) -> &AuthService {
220		&self.auth_service
221	}
222
223	/// Spawn a short-lived server actor for one request and return its ref + handle.
224	///
225	/// The caller must keep the `ActorHandle` alive until the reply is received;
226	/// dropping it shuts down the actor.
227	pub fn spawn_server_actor(&self) -> (ActorRef<ServerMessage>, ActorHandle<ServerMessage>) {
228		let actor = ServerActor::new(self.engine.clone(), self.auth_service.clone(), self.clock.clone());
229		let handle = self.actor_system.spawn("server-req", actor);
230		let actor_ref = handle.actor_ref().clone();
231		(actor_ref, handle)
232	}
233}
234
235#[cfg(test)]
236pub mod tests {
237	use super::*;
238
239	#[test]
240	fn test_query_defaults() {
241		let config = StateConfig::default();
242		assert_eq!(config.query_timeout, Duration::from_secs(30));
243		assert_eq!(config.request_timeout, Duration::from_secs(60));
244		assert_eq!(config.max_connections, 10_000);
245	}
246
247	#[test]
248	fn test_query_config_builder() {
249		let config = StateConfig::new()
250			.query_timeout(Duration::from_secs(60))
251			.request_timeout(Duration::from_secs(120))
252			.max_connections(5_000);
253
254		assert_eq!(config.query_timeout, Duration::from_secs(60));
255		assert_eq!(config.request_timeout, Duration::from_secs(120));
256		assert_eq!(config.max_connections, 5_000);
257	}
258}