reifydb_sub_server_http/
subsystem.rs1use std::{
10 any::Any,
11 net::SocketAddr,
12 sync::{
13 Arc, RwLock,
14 atomic::{AtomicBool, Ordering},
15 },
16};
17
18use axum::serve;
19use reifydb_core::{
20 error::CoreError,
21 interface::version::{ComponentType, HasVersion, SystemVersion},
22};
23use reifydb_runtime::SharedRuntime;
24use reifydb_sub_api::subsystem::{HealthStatus, Subsystem};
25use reifydb_sub_server::state::AppState;
26use reifydb_type::{Result, error::Error};
27use tokio::{net::TcpListener, sync::oneshot};
28use tracing::{error, info};
29
30use crate::routes::router;
31
32pub struct HttpSubsystem {
55 bind_addr: Option<String>,
57 admin_bind_addr: Option<String>,
59 actual_addr: RwLock<Option<SocketAddr>>,
61 admin_actual_addr: RwLock<Option<SocketAddr>>,
63 state: AppState,
65 running: Arc<AtomicBool>,
67 shutdown_tx: Option<oneshot::Sender<()>>,
69 shutdown_complete_rx: Option<oneshot::Receiver<()>>,
71 admin_shutdown_tx: Option<oneshot::Sender<()>>,
73 admin_shutdown_complete_rx: Option<oneshot::Receiver<()>>,
75 runtime: SharedRuntime,
77}
78
79impl HttpSubsystem {
80 pub fn new(
88 bind_addr: Option<String>,
89 admin_bind_addr: Option<String>,
90 state: AppState,
91 runtime: SharedRuntime,
92 ) -> Self {
93 Self {
94 bind_addr,
95 admin_bind_addr,
96 actual_addr: RwLock::new(None),
97 admin_actual_addr: RwLock::new(None),
98 state,
99 running: Arc::new(AtomicBool::new(false)),
100 shutdown_tx: None,
101 shutdown_complete_rx: None,
102 admin_shutdown_tx: None,
103 admin_shutdown_complete_rx: None,
104 runtime,
105 }
106 }
107
108 pub fn bind_addr(&self) -> Option<&str> {
110 self.bind_addr.as_deref()
111 }
112
113 pub fn local_addr(&self) -> Option<SocketAddr> {
115 *self.actual_addr.read().unwrap()
116 }
117
118 pub fn port(&self) -> Option<u16> {
120 self.local_addr().map(|a| a.port())
121 }
122
123 pub fn admin_local_addr(&self) -> Option<SocketAddr> {
125 *self.admin_actual_addr.read().unwrap()
126 }
127
128 pub fn admin_port(&self) -> Option<u16> {
130 self.admin_local_addr().map(|a| a.port())
131 }
132}
133
134impl HasVersion for HttpSubsystem {
135 fn version(&self) -> SystemVersion {
136 SystemVersion {
137 name: env!("CARGO_PKG_NAME")
138 .strip_prefix("reifydb-")
139 .unwrap_or(env!("CARGO_PKG_NAME"))
140 .to_string(),
141 version: env!("CARGO_PKG_VERSION").to_string(),
142 description: "HTTP server subsystem for query and command handling".to_string(),
143 r#type: ComponentType::Subsystem,
144 }
145 }
146}
147
148impl Subsystem for HttpSubsystem {
149 fn name(&self) -> &'static str {
150 "Http"
151 }
152
153 fn start(&mut self) -> Result<()> {
154 if self.running.load(Ordering::SeqCst) {
156 return Ok(());
157 }
158
159 let runtime = self.runtime.clone();
160
161 if let Some(addr) = &self.bind_addr {
163 let addr = addr.clone();
164 let listener = runtime.block_on(TcpListener::bind(&addr)).map_err(|e| {
165 let err: Error = CoreError::SubsystemBindFailed {
166 addr: addr.clone(),
167 reason: e.to_string(),
168 }
169 .into();
170 err
171 })?;
172
173 let actual_addr = listener.local_addr().map_err(|e| {
174 let err: Error = CoreError::SubsystemAddressUnavailable {
175 reason: e.to_string(),
176 }
177 .into();
178 err
179 })?;
180 *self.actual_addr.write().unwrap() = Some(actual_addr);
181 info!("HTTP server bound to {}", actual_addr);
182
183 let (shutdown_tx, shutdown_rx) = oneshot::channel();
184 let (complete_tx, complete_rx) = oneshot::channel();
185
186 let state = self.state.clone();
187 let running = self.running.clone();
188
189 runtime.spawn(async move {
190 running.store(true, Ordering::SeqCst);
192
193 let app = router(state);
195 let server = serve(listener, app).with_graceful_shutdown(async {
196 shutdown_rx.await.ok();
197 info!("HTTP server received shutdown signal");
198 });
199
200 if let Err(e) = server.await {
202 error!("HTTP server error: {}", e);
203 }
204
205 running.store(false, Ordering::SeqCst);
207 let _ = complete_tx.send(());
208 info!("HTTP server stopped");
209 });
210
211 self.shutdown_tx = Some(shutdown_tx);
212 self.shutdown_complete_rx = Some(complete_rx);
213 } else {
214 self.running.store(true, Ordering::SeqCst);
216 }
217
218 if let Some(admin_addr) = &self.admin_bind_addr {
220 let admin_addr = admin_addr.clone();
221 let runtime = self.runtime.clone();
222 let admin_listener = runtime.block_on(TcpListener::bind(&admin_addr)).map_err(|e| {
223 let err: Error = CoreError::SubsystemBindFailed {
224 addr: admin_addr.clone(),
225 reason: e.to_string(),
226 }
227 .into();
228 err
229 })?;
230
231 let admin_actual_addr = admin_listener.local_addr().map_err(|e| {
232 let err: Error = CoreError::SubsystemAddressUnavailable {
233 reason: e.to_string(),
234 }
235 .into();
236 err
237 })?;
238 *self.admin_actual_addr.write().unwrap() = Some(admin_actual_addr);
239 info!("HTTP admin server bound to {}", admin_actual_addr);
240
241 let (admin_shutdown_tx, admin_shutdown_rx) = oneshot::channel();
242 let (admin_complete_tx, admin_complete_rx) = oneshot::channel();
243
244 let admin_config = self.state.config().clone().admin_enabled(true);
246 let admin_state = self.state.clone_with_config(admin_config);
247
248 runtime.spawn(async move {
249 let app = router(admin_state);
250 let server = serve(admin_listener, app).with_graceful_shutdown(async {
251 admin_shutdown_rx.await.ok();
252 info!("HTTP admin server received shutdown signal");
253 });
254
255 if let Err(e) = server.await {
256 error!("HTTP admin server error: {}", e);
257 }
258
259 let _ = admin_complete_tx.send(());
260 info!("HTTP admin server stopped");
261 });
262
263 self.admin_shutdown_tx = Some(admin_shutdown_tx);
264 self.admin_shutdown_complete_rx = Some(admin_complete_rx);
265 }
266
267 Ok(())
268 }
269
270 fn shutdown(&mut self) -> Result<()> {
271 if let Some(tx) = self.admin_shutdown_tx.take() {
273 let _ = tx.send(());
274 }
275 if let Some(rx) = self.admin_shutdown_complete_rx.take() {
276 let _ = self.runtime.block_on(rx);
277 }
278 if let Some(tx) = self.shutdown_tx.take() {
280 let _ = tx.send(());
281 }
282 if let Some(rx) = self.shutdown_complete_rx.take() {
283 let _ = self.runtime.block_on(rx);
284 }
285 self.running.store(false, Ordering::SeqCst);
286 Ok(())
287 }
288
289 fn is_running(&self) -> bool {
290 self.running.load(Ordering::SeqCst)
291 }
292
293 fn health_status(&self) -> HealthStatus {
294 if self.running.load(Ordering::SeqCst) {
295 HealthStatus::Healthy
296 } else if self.shutdown_tx.is_some() {
297 HealthStatus::Warning {
299 description: "Starting up".to_string(),
300 }
301 } else {
302 HealthStatus::Failed {
303 description: "Not running".to_string(),
304 }
305 }
306 }
307
308 fn as_any(&self) -> &dyn Any {
309 self
310 }
311
312 fn as_any_mut(&mut self) -> &mut dyn Any {
313 self
314 }
315}