Skip to main content

reifydb_sub_server_http/
subsystem.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2025 ReifyDB
3
4//! HTTP server subsystem implementing the ReifyDB Subsystem trait.
5//!
6//! This module provides `HttpSubsystem` which manages the lifecycle of the
7//! HTTP server, including startup, health monitoring, and graceful shutdown.
8
9use std::{
10	any::Any,
11	net::SocketAddr,
12	sync::{
13		Arc, RwLock,
14		atomic::{AtomicBool, Ordering},
15	},
16};
17
18use reifydb_core::{
19	error::diagnostic::subsystem::{address_unavailable, bind_failed},
20	interface::version::{ComponentType, HasVersion, SystemVersion},
21};
22use reifydb_runtime::SharedRuntime;
23use reifydb_sub_api::subsystem::{HealthStatus, Subsystem};
24use reifydb_sub_server::state::AppState;
25use reifydb_type::error;
26use tokio::{net::TcpListener, sync::oneshot};
27
28use crate::routes::router;
29
30/// HTTP server subsystem.
31///
32/// Manages an Axum-based HTTP server with support for:
33/// - Graceful startup and shutdown
34/// - Health monitoring
35///
36/// # Example
37///
38/// ```ignore
39/// let state = AppState::new(pool, engine, QueryConfig::default());
40///
41/// let mut http = HttpSubsystem::new(
42///     "0.0.0.0:8090".to_string(),
43///     state,
44/// );
45///
46/// http.start()?;
47/// // Server is now accepting connections
48///
49/// http.shutdown()?;
50/// // Server has gracefully stopped
51/// ```
52pub struct HttpSubsystem {
53	/// Address to bind the server to.
54	bind_addr: String,
55	/// Actual bound address (available after start).
56	actual_addr: RwLock<Option<SocketAddr>>,
57	/// Shared application state.
58	state: AppState,
59	/// Flag indicating if the server is running.
60	running: Arc<AtomicBool>,
61	/// Channel to send shutdown signal.
62	shutdown_tx: Option<oneshot::Sender<()>>,
63	/// Channel to receive shutdown completion.
64	shutdown_complete_rx: Option<oneshot::Receiver<()>>,
65	/// Shared tokio runtime.
66	runtime: SharedRuntime,
67}
68
69impl HttpSubsystem {
70	/// Create a new HTTP subsystem.
71	///
72	/// # Arguments
73	///
74	/// * `bind_addr` - Address and port to bind to (e.g., "0.0.0.0:8090")
75	/// * `state` - Shared application state with engine and config
76	/// * `runtime` - Shared runtime
77	pub fn new(bind_addr: String, state: AppState, runtime: SharedRuntime) -> Self {
78		Self {
79			bind_addr,
80			actual_addr: RwLock::new(None),
81			state,
82			running: Arc::new(AtomicBool::new(false)),
83			shutdown_tx: None,
84			shutdown_complete_rx: None,
85			runtime,
86		}
87	}
88
89	/// Get the bind address.
90	pub fn bind_addr(&self) -> &str {
91		&self.bind_addr
92	}
93
94	/// Get the actual bound address (available after start).
95	pub fn local_addr(&self) -> Option<SocketAddr> {
96		*self.actual_addr.read().unwrap()
97	}
98
99	/// Get the actual bound port (available after start).
100	pub fn port(&self) -> Option<u16> {
101		self.local_addr().map(|a| a.port())
102	}
103}
104
105impl HasVersion for HttpSubsystem {
106	fn version(&self) -> SystemVersion {
107		SystemVersion {
108			name: env!("CARGO_PKG_NAME")
109				.strip_prefix("reifydb-")
110				.unwrap_or(env!("CARGO_PKG_NAME"))
111				.to_string(),
112			version: env!("CARGO_PKG_VERSION").to_string(),
113			description: "HTTP server subsystem for query and command handling".to_string(),
114			r#type: ComponentType::Subsystem,
115		}
116	}
117}
118
119impl Subsystem for HttpSubsystem {
120	fn name(&self) -> &'static str {
121		"Http"
122	}
123
124	fn start(&mut self) -> reifydb_type::Result<()> {
125		// Idempotent: if already running, return success
126		if self.running.load(Ordering::SeqCst) {
127			return Ok(());
128		}
129
130		let addr = self.bind_addr.clone();
131		let runtime = self.runtime.clone();
132		let listener = runtime.block_on(TcpListener::bind(&addr)).map_err(|e| error!(bind_failed(&addr, e)))?;
133
134		let actual_addr = listener.local_addr().map_err(|e| error!(address_unavailable(e)))?;
135		*self.actual_addr.write().unwrap() = Some(actual_addr);
136		tracing::info!("HTTP server bound to {}", actual_addr);
137
138		let (shutdown_tx, shutdown_rx) = oneshot::channel();
139		let (complete_tx, complete_rx) = oneshot::channel();
140
141		let state = self.state.clone();
142		let running = self.running.clone();
143		let runtime = self.runtime.clone();
144
145		runtime.spawn(async move {
146			// Mark as running
147			running.store(true, Ordering::SeqCst);
148
149			// Create router and serve
150			let app = router(state);
151			let server = axum::serve(listener, app).with_graceful_shutdown(async {
152				shutdown_rx.await.ok();
153				tracing::info!("HTTP server received shutdown signal");
154			});
155
156			// Run until shutdown
157			if let Err(e) = server.await {
158				tracing::error!("HTTP server error: {}", e);
159			}
160
161			// Mark as stopped
162			running.store(false, Ordering::SeqCst);
163			let _ = complete_tx.send(());
164			tracing::info!("HTTP server stopped");
165		});
166
167		self.shutdown_tx = Some(shutdown_tx);
168		self.shutdown_complete_rx = Some(complete_rx);
169		Ok(())
170	}
171
172	fn shutdown(&mut self) -> reifydb_type::Result<()> {
173		if let Some(tx) = self.shutdown_tx.take() {
174			let _ = tx.send(());
175		}
176		if let Some(rx) = self.shutdown_complete_rx.take() {
177			let _ = self.runtime.block_on(rx);
178		}
179		Ok(())
180	}
181
182	fn is_running(&self) -> bool {
183		self.running.load(Ordering::SeqCst)
184	}
185
186	fn health_status(&self) -> HealthStatus {
187		if self.running.load(Ordering::SeqCst) {
188			HealthStatus::Healthy
189		} else if self.shutdown_tx.is_some() {
190			// Started but not yet running (startup in progress)
191			HealthStatus::Warning {
192				description: "Starting up".to_string(),
193			}
194		} else {
195			HealthStatus::Failed {
196				description: "Not running".to_string(),
197			}
198		}
199	}
200
201	fn as_any(&self) -> &dyn Any {
202		self
203	}
204
205	fn as_any_mut(&mut self) -> &mut dyn Any {
206		self
207	}
208}