reifydb_sub_server_http/
subsystem.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later
3
4//! HTTP server subsystem implementing the ReifyDB Subsystem trait.
5//!
6//! This module provides `HttpSubsystem` which manages the lifecycle of the
7//! HTTP server, including startup, health monitoring, and graceful shutdown.
8
9use std::{
10	any::Any,
11	net::SocketAddr,
12	sync::{
13		Arc, RwLock,
14		atomic::{AtomicBool, Ordering},
15	},
16	time::Duration,
17};
18
19use async_trait::async_trait;
20use reifydb_core::{
21	diagnostic::subsystem::{address_unavailable, bind_failed},
22	error,
23	interface::version::{ComponentType, HasVersion, SystemVersion},
24};
25use reifydb_sub_api::{HealthStatus, Subsystem};
26use reifydb_sub_server::{AppState, SharedRuntime};
27use tokio::{net::TcpListener, runtime::Handle, sync::oneshot, time::timeout};
28
29use crate::routes::router;
30
31/// HTTP server subsystem.
32///
33/// Manages an Axum-based HTTP server with support for:
34/// - Graceful startup and shutdown
35/// - Health monitoring
36/// - Integration with shared tokio runtime
37///
38/// # Example
39///
40/// ```ignore
41/// let runtime = SharedRuntime::new(4);
42/// let state = AppState::new(engine, QueryConfig::default());
43///
44/// let mut http = HttpSubsystem::new(
45///     "0.0.0.0:8090".to_string(),
46///     state,
47///     runtime.handle(),
48/// );
49///
50/// http.start()?;
51/// // Server is now accepting connections
52///
53/// http.shutdown()?;
54/// // Server has gracefully stopped
55/// ```
56pub struct HttpSubsystem {
57	/// Address to bind the server to.
58	bind_addr: String,
59	/// Actual bound address (available after start).
60	actual_addr: RwLock<Option<SocketAddr>>,
61	/// Shared application state.
62	state: AppState,
63	/// The shared runtime (kept alive to prevent premature shutdown).
64	_runtime: Option<SharedRuntime>,
65	/// Handle to the tokio runtime.
66	handle: Handle,
67	/// Flag indicating if the server is running.
68	running: Arc<AtomicBool>,
69	/// Channel to send shutdown signal.
70	shutdown_tx: Option<oneshot::Sender<()>>,
71	/// Channel to receive shutdown completion.
72	shutdown_complete_rx: Option<oneshot::Receiver<()>>,
73}
74
75impl HttpSubsystem {
76	/// Create a new HTTP subsystem with an owned runtime.
77	///
78	/// This variant keeps the runtime alive for the lifetime of the subsystem.
79	///
80	/// # Arguments
81	///
82	/// * `bind_addr` - Address and port to bind to (e.g., "0.0.0.0:8090")
83	/// * `state` - Shared application state with engine and config
84	/// * `runtime` - Shared runtime (will be kept alive)
85	pub fn new(bind_addr: String, state: AppState, runtime: SharedRuntime) -> Self {
86		let handle = runtime.handle();
87		Self {
88			bind_addr,
89			actual_addr: RwLock::new(None),
90			state,
91			_runtime: Some(runtime),
92			handle,
93			running: Arc::new(AtomicBool::new(false)),
94			shutdown_tx: None,
95			shutdown_complete_rx: None,
96		}
97	}
98
99	/// Get the bind address.
100	pub fn bind_addr(&self) -> &str {
101		&self.bind_addr
102	}
103
104	/// Get the actual bound address (available after start).
105	pub fn local_addr(&self) -> Option<SocketAddr> {
106		*self.actual_addr.read().unwrap()
107	}
108
109	/// Get the actual bound port (available after start).
110	pub fn port(&self) -> Option<u16> {
111		self.local_addr().map(|a| a.port())
112	}
113}
114
115impl HasVersion for HttpSubsystem {
116	fn version(&self) -> SystemVersion {
117		SystemVersion {
118			name: "http".to_string(),
119			version: env!("CARGO_PKG_VERSION").to_string(),
120			description: "HTTP server subsystem for query and command handling".to_string(),
121			r#type: ComponentType::Subsystem,
122		}
123	}
124}
125
126#[async_trait]
127impl Subsystem for HttpSubsystem {
128	fn name(&self) -> &'static str {
129		"Http"
130	}
131
132	async fn start(&mut self) -> reifydb_core::Result<()> {
133		// Idempotent: if already running, return success
134		if self.running.load(Ordering::SeqCst) {
135			return Ok(());
136		}
137
138		let addr = self.bind_addr.clone();
139		let listener = TcpListener::bind(&addr).await.map_err(|e| error!(bind_failed(&addr, e)))?;
140
141		let actual_addr = listener.local_addr().map_err(|e| error!(address_unavailable(e)))?;
142		*self.actual_addr.write().unwrap() = Some(actual_addr);
143		tracing::info!("HTTP server bound to {}", actual_addr);
144
145		let (shutdown_tx, shutdown_rx) = oneshot::channel();
146		let (complete_tx, complete_rx) = oneshot::channel();
147
148		let state = self.state.clone();
149		let running = self.running.clone();
150
151		self.handle.spawn(async move {
152			// Mark as running
153			running.store(true, Ordering::SeqCst);
154
155			// Create router and serve
156			let app = router(state);
157			let server = axum::serve(listener, app).with_graceful_shutdown(async {
158				shutdown_rx.await.ok();
159				tracing::info!("HTTP server received shutdown signal");
160			});
161
162			// Run until shutdown
163			if let Err(e) = server.await {
164				tracing::error!("HTTP server error: {}", e);
165			}
166
167			// Mark as stopped
168			running.store(false, Ordering::SeqCst);
169			let _ = complete_tx.send(());
170			tracing::info!("HTTP server stopped");
171		});
172
173		self.shutdown_tx = Some(shutdown_tx);
174		self.shutdown_complete_rx = Some(complete_rx);
175		Ok(())
176	}
177
178	async fn shutdown(&mut self) -> reifydb_core::Result<()> {
179		// Send shutdown signal
180		if let Some(tx) = self.shutdown_tx.take() {
181			let _ = tx.send(());
182		}
183
184		// Wait for graceful shutdown with timeout
185		if let Some(rx) = self.shutdown_complete_rx.take() {
186			match timeout(Duration::from_secs(30), rx).await {
187				Ok(_) => {
188					tracing::debug!("HTTP server shutdown completed");
189				}
190				Err(_) => {
191					tracing::warn!("HTTP server shutdown timed out");
192				}
193			}
194		}
195
196		Ok(())
197	}
198
199	fn is_running(&self) -> bool {
200		self.running.load(Ordering::SeqCst)
201	}
202
203	fn health_status(&self) -> HealthStatus {
204		if self.running.load(Ordering::SeqCst) {
205			HealthStatus::Healthy
206		} else if self.shutdown_tx.is_some() {
207			// Started but not yet running (startup in progress)
208			HealthStatus::Warning {
209				description: "Starting up".to_string(),
210			}
211		} else {
212			HealthStatus::Failed {
213				description: "Not running".to_string(),
214			}
215		}
216	}
217
218	fn as_any(&self) -> &dyn Any {
219		self
220	}
221
222	fn as_any_mut(&mut self) -> &mut dyn Any {
223		self
224	}
225}