reifydb_sub_server_http/
subsystem.rs1use std::{
10 any::Any,
11 net::SocketAddr,
12 sync::{
13 Arc, RwLock,
14 atomic::{AtomicBool, Ordering},
15 },
16};
17
18use reifydb_core::interface::version::{ComponentType, HasVersion, SystemVersion};
19use reifydb_sub_api::{HealthStatus, Subsystem};
20use reifydb_sub_server::{AppState, SharedRuntime};
21use tokio::{runtime::Handle, sync::oneshot};
22
23pub struct HttpSubsystem {
49 bind_addr: String,
51 actual_addr: RwLock<Option<SocketAddr>>,
53 state: AppState,
55 _runtime: Option<Arc<SharedRuntime>>,
57 handle: Handle,
59 running: Arc<AtomicBool>,
61 shutdown_tx: Option<oneshot::Sender<()>>,
63 shutdown_complete_rx: Option<oneshot::Receiver<()>>,
65}
66
67impl HttpSubsystem {
68 pub fn new(bind_addr: String, state: AppState, handle: Handle) -> Self {
76 Self {
77 bind_addr,
78 actual_addr: RwLock::new(None),
79 state,
80 _runtime: None,
81 handle,
82 running: Arc::new(AtomicBool::new(false)),
83 shutdown_tx: None,
84 shutdown_complete_rx: None,
85 }
86 }
87
88 pub fn with_runtime(bind_addr: String, state: AppState, runtime: Arc<SharedRuntime>) -> Self {
98 let handle = runtime.handle();
99 Self {
100 bind_addr,
101 actual_addr: RwLock::new(None),
102 state,
103 _runtime: Some(runtime),
104 handle,
105 running: Arc::new(AtomicBool::new(false)),
106 shutdown_tx: None,
107 shutdown_complete_rx: None,
108 }
109 }
110
111 pub fn bind_addr(&self) -> &str {
113 &self.bind_addr
114 }
115
116 pub fn local_addr(&self) -> Option<SocketAddr> {
118 *self.actual_addr.read().unwrap()
119 }
120
121 pub fn port(&self) -> Option<u16> {
123 self.local_addr().map(|a| a.port())
124 }
125}
126
127impl HasVersion for HttpSubsystem {
128 fn version(&self) -> SystemVersion {
129 SystemVersion {
130 name: "http".to_string(),
131 version: env!("CARGO_PKG_VERSION").to_string(),
132 description: "HTTP server subsystem for query and command handling".to_string(),
133 r#type: ComponentType::Subsystem,
134 }
135 }
136}
137
138impl Subsystem for HttpSubsystem {
139 fn name(&self) -> &'static str {
140 "Http"
141 }
142
143 fn start(&mut self) -> reifydb_core::Result<()> {
144 if self.running.load(Ordering::SeqCst) {
146 return Ok(());
147 }
148
149 let addr = self.bind_addr.clone();
151 let std_listener = std::net::TcpListener::bind(&addr).map_err(|e| {
152 reifydb_core::error!(reifydb_core::diagnostic::internal::internal(format!(
153 "Failed to bind HTTP server to {}: {}",
154 &addr, e
155 )))
156 })?;
157 std_listener.set_nonblocking(true).map_err(|e| {
158 reifydb_core::error!(reifydb_core::diagnostic::internal::internal(format!(
159 "Failed to set non-blocking on {}: {}",
160 &addr, e
161 )))
162 })?;
163
164 let actual_addr = std_listener.local_addr().map_err(|e| {
165 reifydb_core::error!(reifydb_core::diagnostic::internal::internal(format!(
166 "Failed to get local address: {}",
167 e
168 )))
169 })?;
170
171 let _guard = self.handle.enter();
173 let listener = tokio::net::TcpListener::from_std(std_listener).map_err(|e| {
174 reifydb_core::error!(reifydb_core::diagnostic::internal::internal(format!(
175 "Failed to convert listener: {}",
176 e
177 )))
178 })?;
179 *self.actual_addr.write().unwrap() = Some(actual_addr);
180 tracing::info!("HTTP server bound to {}", actual_addr);
181
182 let (shutdown_tx, shutdown_rx) = oneshot::channel();
183 let (complete_tx, complete_rx) = oneshot::channel();
184
185 let state = self.state.clone();
186 let running = self.running.clone();
187
188 self.handle.spawn(async move {
189 running.store(true, Ordering::SeqCst);
191
192 let app = crate::routes::router(state);
194 let server = axum::serve(listener, app).with_graceful_shutdown(async {
195 shutdown_rx.await.ok();
196 tracing::info!("HTTP server received shutdown signal");
197 });
198
199 if let Err(e) = server.await {
201 tracing::error!("HTTP server error: {}", e);
202 }
203
204 running.store(false, Ordering::SeqCst);
206 let _ = complete_tx.send(());
207 tracing::info!("HTTP server stopped");
208 });
209
210 self.shutdown_tx = Some(shutdown_tx);
211 self.shutdown_complete_rx = Some(complete_rx);
212 Ok(())
213 }
214
215 fn shutdown(&mut self) -> reifydb_core::Result<()> {
216 if let Some(tx) = self.shutdown_tx.take() {
218 let _ = tx.send(());
219 }
220
221 if let Some(rx) = self.shutdown_complete_rx.take() {
223 let handle = self.handle.clone();
224 handle.block_on(async {
225 match tokio::time::timeout(std::time::Duration::from_secs(30), rx).await {
226 Ok(_) => {
227 tracing::debug!("HTTP server shutdown completed");
228 }
229 Err(_) => {
230 tracing::warn!("HTTP server shutdown timed out");
231 }
232 }
233 });
234 }
235
236 Ok(())
237 }
238
239 fn is_running(&self) -> bool {
240 self.running.load(Ordering::SeqCst)
241 }
242
243 fn health_status(&self) -> HealthStatus {
244 if self.running.load(Ordering::SeqCst) {
245 HealthStatus::Healthy
246 } else if self.shutdown_tx.is_some() {
247 HealthStatus::Warning {
249 description: "Starting up".to_string(),
250 }
251 } else {
252 HealthStatus::Failed {
253 description: "Not running".to_string(),
254 }
255 }
256 }
257
258 fn as_any(&self) -> &dyn Any {
259 self
260 }
261
262 fn as_any_mut(&mut self) -> &mut dyn Any {
263 self
264 }
265}