reifydb_sub_server_http/
subsystem.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later
3
4//! HTTP server subsystem implementing the ReifyDB Subsystem trait.
5//!
6//! This module provides `HttpSubsystem` which manages the lifecycle of the
7//! HTTP server, including startup, health monitoring, and graceful shutdown.
8
9use std::{
10	any::Any,
11	net::SocketAddr,
12	sync::{
13		Arc, RwLock,
14		atomic::{AtomicBool, Ordering},
15	},
16};
17
18use reifydb_core::interface::version::{ComponentType, HasVersion, SystemVersion};
19use reifydb_sub_api::{HealthStatus, Subsystem};
20use reifydb_sub_server::{AppState, SharedRuntime};
21use tokio::{runtime::Handle, sync::oneshot};
22
23/// HTTP server subsystem.
24///
25/// Manages an Axum-based HTTP server with support for:
26/// - Graceful startup and shutdown
27/// - Health monitoring
28/// - Integration with shared tokio runtime
29///
30/// # Example
31///
32/// ```ignore
33/// let runtime = SharedRuntime::new(4);
34/// let state = AppState::new(engine, QueryConfig::default());
35///
36/// let mut http = HttpSubsystem::new(
37///     "0.0.0.0:8090".to_string(),
38///     state,
39///     runtime.handle(),
40/// );
41///
42/// http.start()?;
43/// // Server is now accepting connections
44///
45/// http.shutdown()?;
46/// // Server has gracefully stopped
47/// ```
48pub struct HttpSubsystem {
49	/// Address to bind the server to.
50	bind_addr: String,
51	/// Actual bound address (available after start).
52	actual_addr: RwLock<Option<SocketAddr>>,
53	/// Shared application state.
54	state: AppState,
55	/// The shared runtime (kept alive to prevent premature shutdown).
56	_runtime: Option<SharedRuntime>,
57	/// Handle to the tokio runtime.
58	handle: Handle,
59	/// Flag indicating if the server is running.
60	running: Arc<AtomicBool>,
61	/// Channel to send shutdown signal.
62	shutdown_tx: Option<oneshot::Sender<()>>,
63	/// Channel to receive shutdown completion.
64	shutdown_complete_rx: Option<oneshot::Receiver<()>>,
65}
66
67impl HttpSubsystem {
68	/// Create a new HTTP subsystem with an owned runtime.
69	///
70	/// This variant keeps the runtime alive for the lifetime of the subsystem.
71	///
72	/// # Arguments
73	///
74	/// * `bind_addr` - Address and port to bind to (e.g., "0.0.0.0:8090")
75	/// * `state` - Shared application state with engine and config
76	/// * `runtime` - Shared runtime (will be kept alive)
77	pub fn new(bind_addr: String, state: AppState, runtime: SharedRuntime) -> Self {
78		let handle = runtime.handle();
79		Self {
80			bind_addr,
81			actual_addr: RwLock::new(None),
82			state,
83			_runtime: Some(runtime),
84			handle,
85			running: Arc::new(AtomicBool::new(false)),
86			shutdown_tx: None,
87			shutdown_complete_rx: None,
88		}
89	}
90
91	/// Get the bind address.
92	pub fn bind_addr(&self) -> &str {
93		&self.bind_addr
94	}
95
96	/// Get the actual bound address (available after start).
97	pub fn local_addr(&self) -> Option<SocketAddr> {
98		*self.actual_addr.read().unwrap()
99	}
100
101	/// Get the actual bound port (available after start).
102	pub fn port(&self) -> Option<u16> {
103		self.local_addr().map(|a| a.port())
104	}
105}
106
107impl HasVersion for HttpSubsystem {
108	fn version(&self) -> SystemVersion {
109		SystemVersion {
110			name: "http".to_string(),
111			version: env!("CARGO_PKG_VERSION").to_string(),
112			description: "HTTP server subsystem for query and command handling".to_string(),
113			r#type: ComponentType::Subsystem,
114		}
115	}
116}
117
118impl Subsystem for HttpSubsystem {
119	fn name(&self) -> &'static str {
120		"Http"
121	}
122
123	fn start(&mut self) -> reifydb_core::Result<()> {
124		// Idempotent: if already running, return success
125		if self.running.load(Ordering::SeqCst) {
126			return Ok(());
127		}
128
129		// Bind synchronously using std::net, then convert to tokio
130		let addr = self.bind_addr.clone();
131		let std_listener = std::net::TcpListener::bind(&addr).map_err(|e| {
132			reifydb_core::error!(reifydb_core::diagnostic::internal::internal(format!(
133				"Failed to bind HTTP server to {}: {}",
134				&addr, e
135			)))
136		})?;
137		std_listener.set_nonblocking(true).map_err(|e| {
138			reifydb_core::error!(reifydb_core::diagnostic::internal::internal(format!(
139				"Failed to set non-blocking on {}: {}",
140				&addr, e
141			)))
142		})?;
143
144		let actual_addr = std_listener.local_addr().map_err(|e| {
145			reifydb_core::error!(reifydb_core::diagnostic::internal::internal(format!(
146				"Failed to get local address: {}",
147				e
148			)))
149		})?;
150
151		// Enter the runtime context to convert std listener to tokio
152		let _guard = self.handle.enter();
153		let listener = tokio::net::TcpListener::from_std(std_listener).map_err(|e| {
154			reifydb_core::error!(reifydb_core::diagnostic::internal::internal(format!(
155				"Failed to convert listener: {}",
156				e
157			)))
158		})?;
159		*self.actual_addr.write().unwrap() = Some(actual_addr);
160		tracing::info!("HTTP server bound to {}", actual_addr);
161
162		let (shutdown_tx, shutdown_rx) = oneshot::channel();
163		let (complete_tx, complete_rx) = oneshot::channel();
164
165		let state = self.state.clone();
166		let running = self.running.clone();
167
168		self.handle.spawn(async move {
169			// Mark as running
170			running.store(true, Ordering::SeqCst);
171
172			// Create router and serve
173			let app = crate::routes::router(state);
174			let server = axum::serve(listener, app).with_graceful_shutdown(async {
175				shutdown_rx.await.ok();
176				tracing::info!("HTTP server received shutdown signal");
177			});
178
179			// Run until shutdown
180			if let Err(e) = server.await {
181				tracing::error!("HTTP server error: {}", e);
182			}
183
184			// Mark as stopped
185			running.store(false, Ordering::SeqCst);
186			let _ = complete_tx.send(());
187			tracing::info!("HTTP server stopped");
188		});
189
190		self.shutdown_tx = Some(shutdown_tx);
191		self.shutdown_complete_rx = Some(complete_rx);
192		Ok(())
193	}
194
195	fn shutdown(&mut self) -> reifydb_core::Result<()> {
196		// Send shutdown signal
197		if let Some(tx) = self.shutdown_tx.take() {
198			let _ = tx.send(());
199		}
200
201		// Wait for graceful shutdown with timeout
202		if let Some(rx) = self.shutdown_complete_rx.take() {
203			let handle = self.handle.clone();
204			handle.block_on(async {
205				match tokio::time::timeout(std::time::Duration::from_secs(30), rx).await {
206					Ok(_) => {
207						tracing::debug!("HTTP server shutdown completed");
208					}
209					Err(_) => {
210						tracing::warn!("HTTP server shutdown timed out");
211					}
212				}
213			});
214		}
215
216		Ok(())
217	}
218
219	fn is_running(&self) -> bool {
220		self.running.load(Ordering::SeqCst)
221	}
222
223	fn health_status(&self) -> HealthStatus {
224		if self.running.load(Ordering::SeqCst) {
225			HealthStatus::Healthy
226		} else if self.shutdown_tx.is_some() {
227			// Started but not yet running (startup in progress)
228			HealthStatus::Warning {
229				description: "Starting up".to_string(),
230			}
231		} else {
232			HealthStatus::Failed {
233				description: "Not running".to_string(),
234			}
235		}
236	}
237
238	fn as_any(&self) -> &dyn Any {
239		self
240	}
241
242	fn as_any_mut(&mut self) -> &mut dyn Any {
243		self
244	}
245}