Skip to main content

reifydb_sub_server_http/
subsystem.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4//! HTTP server subsystem implementing the ReifyDB Subsystem trait.
5//!
6//! This module provides `HttpSubsystem` which manages the lifecycle of the
7//! HTTP server, including startup, health monitoring, and graceful shutdown.
8
9use std::{
10	any::Any,
11	net::SocketAddr,
12	sync::{
13		Arc, RwLock,
14		atomic::{AtomicBool, Ordering},
15	},
16};
17
18use axum::serve;
19use reifydb_core::{
20	error::CoreError,
21	interface::version::{ComponentType, HasVersion, SystemVersion},
22};
23use reifydb_runtime::SharedRuntime;
24use reifydb_sub_api::subsystem::{HealthStatus, Subsystem};
25use reifydb_sub_server::state::AppState;
26use reifydb_type::{Result, error::Error};
27use tokio::{net::TcpListener, sync::oneshot};
28use tracing::{error, info};
29
30use crate::routes::router;
31
32/// HTTP server subsystem.
33///
34/// Manages an Axum-based HTTP server with support for:
35/// - Graceful startup and shutdown
36/// - Health monitoring
37///
38/// # Example
39///
40/// ```ignore
41/// let state = AppState::new(pool, engine, QueryConfig::default());
42///
43/// let mut http = HttpSubsystem::new(
44///     "0.0.0.0:8090".to_string(),
45///     state,
46/// );
47///
48/// http.start()?;
49/// // Server is now accepting connections
50///
51/// http.shutdown()?;
52/// // Server has gracefully stopped
53/// ```
54pub struct HttpSubsystem {
55	/// Address to bind the server to.
56	bind_addr: String,
57	/// Actual bound address (available after start).
58	actual_addr: RwLock<Option<SocketAddr>>,
59	/// Shared application state.
60	state: AppState,
61	/// Flag indicating if the server is running.
62	running: Arc<AtomicBool>,
63	/// Channel to send shutdown signal.
64	shutdown_tx: Option<oneshot::Sender<()>>,
65	/// Channel to receive shutdown completion.
66	shutdown_complete_rx: Option<oneshot::Receiver<()>>,
67	/// Shared tokio runtime.
68	runtime: SharedRuntime,
69}
70
71impl HttpSubsystem {
72	/// Create a new HTTP subsystem.
73	///
74	/// # Arguments
75	///
76	/// * `bind_addr` - Address and port to bind to (e.g., "0.0.0.0:8090")
77	/// * `state` - Shared application state with engine and config
78	/// * `runtime` - Shared runtime
79	pub fn new(bind_addr: String, state: AppState, runtime: SharedRuntime) -> Self {
80		Self {
81			bind_addr,
82			actual_addr: RwLock::new(None),
83			state,
84			running: Arc::new(AtomicBool::new(false)),
85			shutdown_tx: None,
86			shutdown_complete_rx: None,
87			runtime,
88		}
89	}
90
91	/// Get the bind address.
92	pub fn bind_addr(&self) -> &str {
93		&self.bind_addr
94	}
95
96	/// Get the actual bound address (available after start).
97	pub fn local_addr(&self) -> Option<SocketAddr> {
98		*self.actual_addr.read().unwrap()
99	}
100
101	/// Get the actual bound port (available after start).
102	pub fn port(&self) -> Option<u16> {
103		self.local_addr().map(|a| a.port())
104	}
105}
106
107impl HasVersion for HttpSubsystem {
108	fn version(&self) -> SystemVersion {
109		SystemVersion {
110			name: env!("CARGO_PKG_NAME")
111				.strip_prefix("reifydb-")
112				.unwrap_or(env!("CARGO_PKG_NAME"))
113				.to_string(),
114			version: env!("CARGO_PKG_VERSION").to_string(),
115			description: "HTTP server subsystem for query and command handling".to_string(),
116			r#type: ComponentType::Subsystem,
117		}
118	}
119}
120
121impl Subsystem for HttpSubsystem {
122	fn name(&self) -> &'static str {
123		"Http"
124	}
125
126	fn start(&mut self) -> Result<()> {
127		// Idempotent: if already running, return success
128		if self.running.load(Ordering::SeqCst) {
129			return Ok(());
130		}
131
132		let addr = self.bind_addr.clone();
133		let runtime = self.runtime.clone();
134		let listener = runtime.block_on(TcpListener::bind(&addr)).map_err(|e| {
135			let err: Error = CoreError::SubsystemBindFailed {
136				addr: addr.clone(),
137				reason: e.to_string(),
138			}
139			.into();
140			err
141		})?;
142
143		let actual_addr = listener.local_addr().map_err(|e| {
144			let err: Error = CoreError::SubsystemAddressUnavailable {
145				reason: e.to_string(),
146			}
147			.into();
148			err
149		})?;
150		*self.actual_addr.write().unwrap() = Some(actual_addr);
151		info!("HTTP server bound to {}", actual_addr);
152
153		let (shutdown_tx, shutdown_rx) = oneshot::channel();
154		let (complete_tx, complete_rx) = oneshot::channel();
155
156		let state = self.state.clone();
157		let running = self.running.clone();
158		let runtime = self.runtime.clone();
159
160		runtime.spawn(async move {
161			// Mark as running
162			running.store(true, Ordering::SeqCst);
163
164			// Create router and serve
165			let app = router(state);
166			let server = serve(listener, app).with_graceful_shutdown(async {
167				shutdown_rx.await.ok();
168				info!("HTTP server received shutdown signal");
169			});
170
171			// Run until shutdown
172			if let Err(e) = server.await {
173				error!("HTTP server error: {}", e);
174			}
175
176			// Mark as stopped
177			running.store(false, Ordering::SeqCst);
178			let _ = complete_tx.send(());
179			info!("HTTP server stopped");
180		});
181
182		self.shutdown_tx = Some(shutdown_tx);
183		self.shutdown_complete_rx = Some(complete_rx);
184		Ok(())
185	}
186
187	fn shutdown(&mut self) -> Result<()> {
188		if let Some(tx) = self.shutdown_tx.take() {
189			let _ = tx.send(());
190		}
191		if let Some(rx) = self.shutdown_complete_rx.take() {
192			let _ = self.runtime.block_on(rx);
193		}
194		Ok(())
195	}
196
197	fn is_running(&self) -> bool {
198		self.running.load(Ordering::SeqCst)
199	}
200
201	fn health_status(&self) -> HealthStatus {
202		if self.running.load(Ordering::SeqCst) {
203			HealthStatus::Healthy
204		} else if self.shutdown_tx.is_some() {
205			// Started but not yet running (startup in progress)
206			HealthStatus::Warning {
207				description: "Starting up".to_string(),
208			}
209		} else {
210			HealthStatus::Failed {
211				description: "Not running".to_string(),
212			}
213		}
214	}
215
216	fn as_any(&self) -> &dyn Any {
217		self
218	}
219
220	fn as_any_mut(&mut self) -> &mut dyn Any {
221		self
222	}
223}