reifydb_sub_server_http/
subsystem.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later
3
4//! HTTP server subsystem implementing the ReifyDB Subsystem trait.
5//!
6//! This module provides `HttpSubsystem` which manages the lifecycle of the
7//! HTTP server, including startup, health monitoring, and graceful shutdown.
8
9use std::{
10	any::Any,
11	net::SocketAddr,
12	sync::{
13		Arc, RwLock,
14		atomic::{AtomicBool, Ordering},
15	},
16};
17
18use reifydb_core::interface::version::{ComponentType, HasVersion, SystemVersion};
19use reifydb_sub_api::{HealthStatus, Subsystem};
20use reifydb_sub_server::{AppState, SharedRuntime};
21use tokio::{runtime::Handle, sync::oneshot};
22
23/// HTTP server subsystem.
24///
25/// Manages an Axum-based HTTP server with support for:
26/// - Graceful startup and shutdown
27/// - Health monitoring
28/// - Integration with shared tokio runtime
29///
30/// # Example
31///
32/// ```ignore
33/// let runtime = SharedRuntime::new(4);
34/// let state = AppState::new(engine, QueryConfig::default());
35///
36/// let mut http = HttpSubsystem::new(
37///     "0.0.0.0:8090".to_string(),
38///     state,
39///     runtime.handle(),
40/// );
41///
42/// http.start()?;
43/// // Server is now accepting connections
44///
45/// http.shutdown()?;
46/// // Server has gracefully stopped
47/// ```
48pub struct HttpSubsystem {
49	/// Address to bind the server to.
50	bind_addr: String,
51	/// Actual bound address (available after start).
52	actual_addr: RwLock<Option<SocketAddr>>,
53	/// Shared application state.
54	state: AppState,
55	/// The shared runtime (kept alive to prevent premature shutdown).
56	_runtime: Option<Arc<SharedRuntime>>,
57	/// Handle to the tokio runtime.
58	handle: Handle,
59	/// Flag indicating if the server is running.
60	running: Arc<AtomicBool>,
61	/// Channel to send shutdown signal.
62	shutdown_tx: Option<oneshot::Sender<()>>,
63	/// Channel to receive shutdown completion.
64	shutdown_complete_rx: Option<oneshot::Receiver<()>>,
65}
66
67impl HttpSubsystem {
68	/// Create a new HTTP subsystem.
69	///
70	/// # Arguments
71	///
72	/// * `bind_addr` - Address and port to bind to (e.g., "0.0.0.0:8090")
73	/// * `state` - Shared application state with engine and config
74	/// * `handle` - Handle to the tokio runtime
75	pub fn new(bind_addr: String, state: AppState, handle: Handle) -> Self {
76		Self {
77			bind_addr,
78			actual_addr: RwLock::new(None),
79			state,
80			_runtime: None,
81			handle,
82			running: Arc::new(AtomicBool::new(false)),
83			shutdown_tx: None,
84			shutdown_complete_rx: None,
85		}
86	}
87
88	/// Create a new HTTP subsystem with an owned runtime.
89	///
90	/// This variant keeps the runtime alive for the lifetime of the subsystem.
91	///
92	/// # Arguments
93	///
94	/// * `bind_addr` - Address and port to bind to (e.g., "0.0.0.0:8090")
95	/// * `state` - Shared application state with engine and config
96	/// * `runtime` - Shared runtime (will be kept alive)
97	pub fn with_runtime(bind_addr: String, state: AppState, runtime: Arc<SharedRuntime>) -> Self {
98		let handle = runtime.handle();
99		Self {
100			bind_addr,
101			actual_addr: RwLock::new(None),
102			state,
103			_runtime: Some(runtime),
104			handle,
105			running: Arc::new(AtomicBool::new(false)),
106			shutdown_tx: None,
107			shutdown_complete_rx: None,
108		}
109	}
110
111	/// Get the bind address.
112	pub fn bind_addr(&self) -> &str {
113		&self.bind_addr
114	}
115
116	/// Get the actual bound address (available after start).
117	pub fn local_addr(&self) -> Option<SocketAddr> {
118		*self.actual_addr.read().unwrap()
119	}
120
121	/// Get the actual bound port (available after start).
122	pub fn port(&self) -> Option<u16> {
123		self.local_addr().map(|a| a.port())
124	}
125}
126
127impl HasVersion for HttpSubsystem {
128	fn version(&self) -> SystemVersion {
129		SystemVersion {
130			name: "http".to_string(),
131			version: env!("CARGO_PKG_VERSION").to_string(),
132			description: "HTTP server subsystem for query and command handling".to_string(),
133			r#type: ComponentType::Subsystem,
134		}
135	}
136}
137
138impl Subsystem for HttpSubsystem {
139	fn name(&self) -> &'static str {
140		"Http"
141	}
142
143	fn start(&mut self) -> reifydb_core::Result<()> {
144		// Idempotent: if already running, return success
145		if self.running.load(Ordering::SeqCst) {
146			return Ok(());
147		}
148
149		// Bind synchronously using std::net, then convert to tokio
150		let addr = self.bind_addr.clone();
151		let std_listener = std::net::TcpListener::bind(&addr).map_err(|e| {
152			reifydb_core::error!(reifydb_core::diagnostic::internal::internal(format!(
153				"Failed to bind HTTP server to {}: {}",
154				&addr, e
155			)))
156		})?;
157		std_listener.set_nonblocking(true).map_err(|e| {
158			reifydb_core::error!(reifydb_core::diagnostic::internal::internal(format!(
159				"Failed to set non-blocking on {}: {}",
160				&addr, e
161			)))
162		})?;
163
164		let actual_addr = std_listener.local_addr().map_err(|e| {
165			reifydb_core::error!(reifydb_core::diagnostic::internal::internal(format!(
166				"Failed to get local address: {}",
167				e
168			)))
169		})?;
170
171		// Enter the runtime context to convert std listener to tokio
172		let _guard = self.handle.enter();
173		let listener = tokio::net::TcpListener::from_std(std_listener).map_err(|e| {
174			reifydb_core::error!(reifydb_core::diagnostic::internal::internal(format!(
175				"Failed to convert listener: {}",
176				e
177			)))
178		})?;
179		*self.actual_addr.write().unwrap() = Some(actual_addr);
180		tracing::info!("HTTP server bound to {}", actual_addr);
181
182		let (shutdown_tx, shutdown_rx) = oneshot::channel();
183		let (complete_tx, complete_rx) = oneshot::channel();
184
185		let state = self.state.clone();
186		let running = self.running.clone();
187
188		self.handle.spawn(async move {
189			// Mark as running
190			running.store(true, Ordering::SeqCst);
191
192			// Create router and serve
193			let app = crate::routes::router(state);
194			let server = axum::serve(listener, app).with_graceful_shutdown(async {
195				shutdown_rx.await.ok();
196				tracing::info!("HTTP server received shutdown signal");
197			});
198
199			// Run until shutdown
200			if let Err(e) = server.await {
201				tracing::error!("HTTP server error: {}", e);
202			}
203
204			// Mark as stopped
205			running.store(false, Ordering::SeqCst);
206			let _ = complete_tx.send(());
207			tracing::info!("HTTP server stopped");
208		});
209
210		self.shutdown_tx = Some(shutdown_tx);
211		self.shutdown_complete_rx = Some(complete_rx);
212		Ok(())
213	}
214
215	fn shutdown(&mut self) -> reifydb_core::Result<()> {
216		// Send shutdown signal
217		if let Some(tx) = self.shutdown_tx.take() {
218			let _ = tx.send(());
219		}
220
221		// Wait for graceful shutdown with timeout
222		if let Some(rx) = self.shutdown_complete_rx.take() {
223			let handle = self.handle.clone();
224			handle.block_on(async {
225				match tokio::time::timeout(std::time::Duration::from_secs(30), rx).await {
226					Ok(_) => {
227						tracing::debug!("HTTP server shutdown completed");
228					}
229					Err(_) => {
230						tracing::warn!("HTTP server shutdown timed out");
231					}
232				}
233			});
234		}
235
236		Ok(())
237	}
238
239	fn is_running(&self) -> bool {
240		self.running.load(Ordering::SeqCst)
241	}
242
243	fn health_status(&self) -> HealthStatus {
244		if self.running.load(Ordering::SeqCst) {
245			HealthStatus::Healthy
246		} else if self.shutdown_tx.is_some() {
247			// Started but not yet running (startup in progress)
248			HealthStatus::Warning {
249				description: "Starting up".to_string(),
250			}
251		} else {
252			HealthStatus::Failed {
253				description: "Not running".to_string(),
254			}
255		}
256	}
257
258	fn as_any(&self) -> &dyn Any {
259		self
260	}
261
262	fn as_any_mut(&mut self) -> &mut dyn Any {
263		self
264	}
265}