reifydb_sub_server_http/
subsystem.rs1use std::{
10 any::Any,
11 net::SocketAddr,
12 sync::{
13 Arc, RwLock,
14 atomic::{AtomicBool, Ordering},
15 },
16};
17
18use axum::{serve, serve::ListenerExt};
19use reifydb_core::{
20 error::CoreError,
21 interface::version::{ComponentType, HasVersion, SystemVersion},
22};
23use reifydb_runtime::SharedRuntime;
24use reifydb_sub_api::subsystem::{HealthStatus, Subsystem};
25use reifydb_sub_server::state::AppState;
26use reifydb_type::{Result, error::Error};
27use tokio::{net::TcpListener, sync::oneshot};
28use tracing::{error, info, warn};
29
30use crate::{routes::router, state::HttpServerState};
31
32pub struct HttpSubsystem {
55 bind_addr: Option<String>,
57 admin_bind_addr: Option<String>,
59 actual_addr: RwLock<Option<SocketAddr>>,
61 admin_actual_addr: RwLock<Option<SocketAddr>>,
63 state: AppState,
65 running: Arc<AtomicBool>,
67 shutdown_tx: Option<oneshot::Sender<()>>,
69 shutdown_complete_rx: Option<oneshot::Receiver<()>>,
71 admin_shutdown_tx: Option<oneshot::Sender<()>>,
73 admin_shutdown_complete_rx: Option<oneshot::Receiver<()>>,
75 runtime: SharedRuntime,
77}
78
79impl HttpSubsystem {
80 pub fn new(
88 bind_addr: Option<String>,
89 admin_bind_addr: Option<String>,
90 state: AppState,
91 runtime: SharedRuntime,
92 ) -> Self {
93 Self {
94 bind_addr,
95 admin_bind_addr,
96 actual_addr: RwLock::new(None),
97 admin_actual_addr: RwLock::new(None),
98 state,
99 running: Arc::new(AtomicBool::new(false)),
100 shutdown_tx: None,
101 shutdown_complete_rx: None,
102 admin_shutdown_tx: None,
103 admin_shutdown_complete_rx: None,
104 runtime,
105 }
106 }
107
108 pub fn bind_addr(&self) -> Option<&str> {
110 self.bind_addr.as_deref()
111 }
112
113 pub fn local_addr(&self) -> Option<SocketAddr> {
115 *self.actual_addr.read().unwrap()
116 }
117
118 pub fn port(&self) -> Option<u16> {
120 self.local_addr().map(|a| a.port())
121 }
122
123 pub fn admin_local_addr(&self) -> Option<SocketAddr> {
125 *self.admin_actual_addr.read().unwrap()
126 }
127
128 pub fn admin_port(&self) -> Option<u16> {
130 self.admin_local_addr().map(|a| a.port())
131 }
132}
133
134impl HasVersion for HttpSubsystem {
135 fn version(&self) -> SystemVersion {
136 SystemVersion {
137 name: env!("CARGO_PKG_NAME")
138 .strip_prefix("reifydb-")
139 .unwrap_or(env!("CARGO_PKG_NAME"))
140 .to_string(),
141 version: env!("CARGO_PKG_VERSION").to_string(),
142 description: "HTTP server subsystem for query and command handling".to_string(),
143 r#type: ComponentType::Subsystem,
144 }
145 }
146}
147
148impl Subsystem for HttpSubsystem {
149 fn name(&self) -> &'static str {
150 "Http"
151 }
152
153 fn start(&mut self) -> Result<()> {
154 if self.running.load(Ordering::SeqCst) {
156 return Ok(());
157 }
158
159 let runtime = self.runtime.clone();
160
161 if let Some(addr) = &self.bind_addr {
163 let addr = addr.clone();
164 let listener = runtime.block_on(TcpListener::bind(&addr)).map_err(|e| {
165 let err: Error = CoreError::SubsystemBindFailed {
166 addr: addr.clone(),
167 reason: e.to_string(),
168 }
169 .into();
170 err
171 })?;
172
173 let actual_addr = listener.local_addr().map_err(|e| {
174 let err: Error = CoreError::SubsystemAddressUnavailable {
175 reason: e.to_string(),
176 }
177 .into();
178 err
179 })?;
180 *self.actual_addr.write().unwrap() = Some(actual_addr);
181 info!("HTTP server bound to {}", actual_addr);
182
183 let (shutdown_tx, shutdown_rx) = oneshot::channel();
184 let (complete_tx, complete_rx) = oneshot::channel();
185
186 let server_state = HttpServerState::new(self.state.clone());
187 let running = self.running.clone();
188
189 runtime.spawn(async move {
190 running.store(true, Ordering::SeqCst);
192
193 let app = router(server_state);
195 let listener = listener.tap_io(|tcp_stream| {
196 if let Err(e) = tcp_stream.set_nodelay(true) {
197 warn!("Failed to set TCP_NODELAY: {e}");
198 }
199 });
200 let server = serve(listener, app).with_graceful_shutdown(async {
201 shutdown_rx.await.ok();
202 info!("HTTP server received shutdown signal");
203 });
204
205 if let Err(e) = server.await {
207 error!("HTTP server error: {}", e);
208 }
209
210 running.store(false, Ordering::SeqCst);
212 let _ = complete_tx.send(());
213 info!("HTTP server stopped");
214 });
215
216 self.shutdown_tx = Some(shutdown_tx);
217 self.shutdown_complete_rx = Some(complete_rx);
218 } else {
219 self.running.store(true, Ordering::SeqCst);
221 }
222
223 if let Some(admin_addr) = &self.admin_bind_addr {
225 let admin_addr = admin_addr.clone();
226 let runtime = self.runtime.clone();
227 let admin_listener = runtime.block_on(TcpListener::bind(&admin_addr)).map_err(|e| {
228 let err: Error = CoreError::SubsystemBindFailed {
229 addr: admin_addr.clone(),
230 reason: e.to_string(),
231 }
232 .into();
233 err
234 })?;
235
236 let admin_actual_addr = admin_listener.local_addr().map_err(|e| {
237 let err: Error = CoreError::SubsystemAddressUnavailable {
238 reason: e.to_string(),
239 }
240 .into();
241 err
242 })?;
243 *self.admin_actual_addr.write().unwrap() = Some(admin_actual_addr);
244 info!("HTTP admin server bound to {}", admin_actual_addr);
245
246 let (admin_shutdown_tx, admin_shutdown_rx) = oneshot::channel();
247 let (admin_complete_tx, admin_complete_rx) = oneshot::channel();
248
249 let admin_config = self.state.config().clone().admin_enabled(true);
251 let admin_app_state = self.state.clone_with_config(admin_config);
252 let admin_server_state = HttpServerState::new(admin_app_state);
253
254 runtime.spawn(async move {
255 let app = router(admin_server_state);
256 let admin_listener = admin_listener.tap_io(|tcp_stream| {
257 if let Err(e) = tcp_stream.set_nodelay(true) {
258 warn!("Failed to set TCP_NODELAY: {e}");
259 }
260 });
261 let server = serve(admin_listener, app).with_graceful_shutdown(async {
262 admin_shutdown_rx.await.ok();
263 info!("HTTP admin server received shutdown signal");
264 });
265
266 if let Err(e) = server.await {
267 error!("HTTP admin server error: {}", e);
268 }
269
270 let _ = admin_complete_tx.send(());
271 info!("HTTP admin server stopped");
272 });
273
274 self.admin_shutdown_tx = Some(admin_shutdown_tx);
275 self.admin_shutdown_complete_rx = Some(admin_complete_rx);
276 }
277
278 Ok(())
279 }
280
281 fn shutdown(&mut self) -> Result<()> {
282 if let Some(tx) = self.admin_shutdown_tx.take() {
284 let _ = tx.send(());
285 }
286 if let Some(rx) = self.admin_shutdown_complete_rx.take() {
287 let _ = self.runtime.block_on(rx);
288 }
289 if let Some(tx) = self.shutdown_tx.take() {
291 let _ = tx.send(());
292 }
293 if let Some(rx) = self.shutdown_complete_rx.take() {
294 let _ = self.runtime.block_on(rx);
295 }
296 self.running.store(false, Ordering::SeqCst);
297 Ok(())
298 }
299
300 fn is_running(&self) -> bool {
301 self.running.load(Ordering::SeqCst)
302 }
303
304 fn health_status(&self) -> HealthStatus {
305 if self.running.load(Ordering::SeqCst) {
306 HealthStatus::Healthy
307 } else if self.shutdown_tx.is_some() {
308 HealthStatus::Warning {
310 description: "Starting up".to_string(),
311 }
312 } else {
313 HealthStatus::Failed {
314 description: "Not running".to_string(),
315 }
316 }
317 }
318
319 fn as_any(&self) -> &dyn Any {
320 self
321 }
322
323 fn as_any_mut(&mut self) -> &mut dyn Any {
324 self
325 }
326}