Skip to main content

reifydb_sub_server_http/
subsystem.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4//! HTTP server subsystem implementing the ReifyDB Subsystem trait.
5//!
6//! This module provides `HttpSubsystem` which manages the lifecycle of the
7//! HTTP server, including startup, health monitoring, and graceful shutdown.
8
9use std::{
10	any::Any,
11	net::SocketAddr,
12	sync::{
13		Arc, RwLock,
14		atomic::{AtomicBool, Ordering},
15	},
16};
17
18use axum::serve;
19use reifydb_core::{
20	error::CoreError,
21	interface::version::{ComponentType, HasVersion, SystemVersion},
22};
23use reifydb_runtime::SharedRuntime;
24use reifydb_sub_api::subsystem::{HealthStatus, Subsystem};
25use reifydb_sub_server::state::AppState;
26use reifydb_type::{Result, error::Error};
27use tokio::{net::TcpListener, sync::oneshot};
28use tracing::{error, info};
29
30use crate::routes::router;
31
32/// HTTP server subsystem.
33///
34/// Manages an Axum-based HTTP server with support for:
35/// - Graceful startup and shutdown
36/// - Health monitoring
37///
38/// # Example
39///
40/// ```ignore
41/// let state = AppState::new(pool, engine, QueryConfig::default(), RequestInterceptorChain::empty());
42///
43/// let mut http = HttpSubsystem::new(
44///     "0.0.0.0:8090".to_string(),
45///     state,
46/// );
47///
48/// http.start()?;
49/// // Server is now accepting connections
50///
51/// http.shutdown()?;
52/// // Server has gracefully stopped
53/// ```
54pub struct HttpSubsystem {
55	/// Address to bind the server to.
56	bind_addr: Option<String>,
57	/// Address to bind the admin server to.
58	admin_bind_addr: Option<String>,
59	/// Actual bound address (available after start).
60	actual_addr: RwLock<Option<SocketAddr>>,
61	/// Actual bound address for admin server (available after start).
62	admin_actual_addr: RwLock<Option<SocketAddr>>,
63	/// Shared application state.
64	state: AppState,
65	/// Flag indicating if the server is running.
66	running: Arc<AtomicBool>,
67	/// Channel to send shutdown signal.
68	shutdown_tx: Option<oneshot::Sender<()>>,
69	/// Channel to receive shutdown completion.
70	shutdown_complete_rx: Option<oneshot::Receiver<()>>,
71	/// Channel to send admin shutdown signal.
72	admin_shutdown_tx: Option<oneshot::Sender<()>>,
73	/// Channel to receive admin shutdown completion.
74	admin_shutdown_complete_rx: Option<oneshot::Receiver<()>>,
75	/// Shared tokio runtime.
76	runtime: SharedRuntime,
77}
78
79impl HttpSubsystem {
80	/// Create a new HTTP subsystem.
81	///
82	/// # Arguments
83	///
84	/// * `bind_addr` - Address and port to bind to (e.g., "0.0.0.0:8090")
85	/// * `state` - Shared application state with engine and config
86	/// * `runtime` - Shared runtime
87	pub fn new(
88		bind_addr: Option<String>,
89		admin_bind_addr: Option<String>,
90		state: AppState,
91		runtime: SharedRuntime,
92	) -> Self {
93		Self {
94			bind_addr,
95			admin_bind_addr,
96			actual_addr: RwLock::new(None),
97			admin_actual_addr: RwLock::new(None),
98			state,
99			running: Arc::new(AtomicBool::new(false)),
100			shutdown_tx: None,
101			shutdown_complete_rx: None,
102			admin_shutdown_tx: None,
103			admin_shutdown_complete_rx: None,
104			runtime,
105		}
106	}
107
108	/// Get the bind address.
109	pub fn bind_addr(&self) -> Option<&str> {
110		self.bind_addr.as_deref()
111	}
112
113	/// Get the actual bound address (available after start).
114	pub fn local_addr(&self) -> Option<SocketAddr> {
115		*self.actual_addr.read().unwrap()
116	}
117
118	/// Get the actual bound port (available after start).
119	pub fn port(&self) -> Option<u16> {
120		self.local_addr().map(|a| a.port())
121	}
122
123	/// Get the actual bound address for the admin server (available after start).
124	pub fn admin_local_addr(&self) -> Option<SocketAddr> {
125		*self.admin_actual_addr.read().unwrap()
126	}
127
128	/// Get the actual bound port for the admin server (available after start).
129	pub fn admin_port(&self) -> Option<u16> {
130		self.admin_local_addr().map(|a| a.port())
131	}
132}
133
134impl HasVersion for HttpSubsystem {
135	fn version(&self) -> SystemVersion {
136		SystemVersion {
137			name: env!("CARGO_PKG_NAME")
138				.strip_prefix("reifydb-")
139				.unwrap_or(env!("CARGO_PKG_NAME"))
140				.to_string(),
141			version: env!("CARGO_PKG_VERSION").to_string(),
142			description: "HTTP server subsystem for query and command handling".to_string(),
143			r#type: ComponentType::Subsystem,
144		}
145	}
146}
147
148impl Subsystem for HttpSubsystem {
149	fn name(&self) -> &'static str {
150		"Http"
151	}
152
153	fn start(&mut self) -> Result<()> {
154		// Idempotent: if already running, return success
155		if self.running.load(Ordering::SeqCst) {
156			return Ok(());
157		}
158
159		let runtime = self.runtime.clone();
160
161		// Bind main listener if configured
162		if let Some(addr) = &self.bind_addr {
163			let addr = addr.clone();
164			let listener = runtime.block_on(TcpListener::bind(&addr)).map_err(|e| {
165				let err: Error = CoreError::SubsystemBindFailed {
166					addr: addr.clone(),
167					reason: e.to_string(),
168				}
169				.into();
170				err
171			})?;
172
173			let actual_addr = listener.local_addr().map_err(|e| {
174				let err: Error = CoreError::SubsystemAddressUnavailable {
175					reason: e.to_string(),
176				}
177				.into();
178				err
179			})?;
180			*self.actual_addr.write().unwrap() = Some(actual_addr);
181			info!("HTTP server bound to {}", actual_addr);
182
183			let (shutdown_tx, shutdown_rx) = oneshot::channel();
184			let (complete_tx, complete_rx) = oneshot::channel();
185
186			let state = self.state.clone();
187			let running = self.running.clone();
188
189			runtime.spawn(async move {
190				// Mark as running
191				running.store(true, Ordering::SeqCst);
192
193				// Create router and serve (admin_enabled is false by default in state)
194				let app = router(state);
195				let server = serve(listener, app).with_graceful_shutdown(async {
196					shutdown_rx.await.ok();
197					info!("HTTP server received shutdown signal");
198				});
199
200				// Run until shutdown
201				if let Err(e) = server.await {
202					error!("HTTP server error: {}", e);
203				}
204
205				// Mark as stopped
206				running.store(false, Ordering::SeqCst);
207				let _ = complete_tx.send(());
208				info!("HTTP server stopped");
209			});
210
211			self.shutdown_tx = Some(shutdown_tx);
212			self.shutdown_complete_rx = Some(complete_rx);
213		} else {
214			// No main listener — mark running synchronously
215			self.running.store(true, Ordering::SeqCst);
216		}
217
218		// Start admin listener if configured
219		if let Some(admin_addr) = &self.admin_bind_addr {
220			let admin_addr = admin_addr.clone();
221			let runtime = self.runtime.clone();
222			let admin_listener = runtime.block_on(TcpListener::bind(&admin_addr)).map_err(|e| {
223				let err: Error = CoreError::SubsystemBindFailed {
224					addr: admin_addr.clone(),
225					reason: e.to_string(),
226				}
227				.into();
228				err
229			})?;
230
231			let admin_actual_addr = admin_listener.local_addr().map_err(|e| {
232				let err: Error = CoreError::SubsystemAddressUnavailable {
233					reason: e.to_string(),
234				}
235				.into();
236				err
237			})?;
238			*self.admin_actual_addr.write().unwrap() = Some(admin_actual_addr);
239			info!("HTTP admin server bound to {}", admin_actual_addr);
240
241			let (admin_shutdown_tx, admin_shutdown_rx) = oneshot::channel();
242			let (admin_complete_tx, admin_complete_rx) = oneshot::channel();
243
244			// Create admin state with admin_enabled = true, preserving interceptors
245			let admin_config = self.state.config().clone().admin_enabled(true);
246			let admin_state = self.state.clone_with_config(admin_config);
247
248			runtime.spawn(async move {
249				let app = router(admin_state);
250				let server = serve(admin_listener, app).with_graceful_shutdown(async {
251					admin_shutdown_rx.await.ok();
252					info!("HTTP admin server received shutdown signal");
253				});
254
255				if let Err(e) = server.await {
256					error!("HTTP admin server error: {}", e);
257				}
258
259				let _ = admin_complete_tx.send(());
260				info!("HTTP admin server stopped");
261			});
262
263			self.admin_shutdown_tx = Some(admin_shutdown_tx);
264			self.admin_shutdown_complete_rx = Some(admin_complete_rx);
265		}
266
267		Ok(())
268	}
269
270	fn shutdown(&mut self) -> Result<()> {
271		// Shutdown admin server first
272		if let Some(tx) = self.admin_shutdown_tx.take() {
273			let _ = tx.send(());
274		}
275		if let Some(rx) = self.admin_shutdown_complete_rx.take() {
276			let _ = self.runtime.block_on(rx);
277		}
278		// Then shutdown main server
279		if let Some(tx) = self.shutdown_tx.take() {
280			let _ = tx.send(());
281		}
282		if let Some(rx) = self.shutdown_complete_rx.take() {
283			let _ = self.runtime.block_on(rx);
284		}
285		self.running.store(false, Ordering::SeqCst);
286		Ok(())
287	}
288
289	fn is_running(&self) -> bool {
290		self.running.load(Ordering::SeqCst)
291	}
292
293	fn health_status(&self) -> HealthStatus {
294		if self.running.load(Ordering::SeqCst) {
295			HealthStatus::Healthy
296		} else if self.shutdown_tx.is_some() {
297			// Started but not yet running (startup in progress)
298			HealthStatus::Warning {
299				description: "Starting up".to_string(),
300			}
301		} else {
302			HealthStatus::Failed {
303				description: "Not running".to_string(),
304			}
305		}
306	}
307
308	fn as_any(&self) -> &dyn Any {
309		self
310	}
311
312	fn as_any_mut(&mut self) -> &mut dyn Any {
313		self
314	}
315}