reifydb_sub_server_http/
subsystem.rs1use std::{
10 any::Any,
11 net::SocketAddr,
12 sync::{
13 Arc, RwLock,
14 atomic::{AtomicBool, Ordering},
15 },
16 time::Duration,
17};
18
19use async_trait::async_trait;
20use reifydb_core::{
21 diagnostic::subsystem::{address_unavailable, bind_failed},
22 error,
23 interface::version::{ComponentType, HasVersion, SystemVersion},
24};
25use reifydb_sub_api::{HealthStatus, Subsystem};
26use reifydb_sub_server::{AppState, SharedRuntime};
27use tokio::{net::TcpListener, runtime::Handle, sync::oneshot, time::timeout};
28
29use crate::routes::router;
30
31pub struct HttpSubsystem {
57 bind_addr: String,
59 actual_addr: RwLock<Option<SocketAddr>>,
61 state: AppState,
63 _runtime: Option<SharedRuntime>,
65 handle: Handle,
67 running: Arc<AtomicBool>,
69 shutdown_tx: Option<oneshot::Sender<()>>,
71 shutdown_complete_rx: Option<oneshot::Receiver<()>>,
73}
74
75impl HttpSubsystem {
76 pub fn new(bind_addr: String, state: AppState, runtime: SharedRuntime) -> Self {
86 let handle = runtime.handle();
87 Self {
88 bind_addr,
89 actual_addr: RwLock::new(None),
90 state,
91 _runtime: Some(runtime),
92 handle,
93 running: Arc::new(AtomicBool::new(false)),
94 shutdown_tx: None,
95 shutdown_complete_rx: None,
96 }
97 }
98
99 pub fn bind_addr(&self) -> &str {
101 &self.bind_addr
102 }
103
104 pub fn local_addr(&self) -> Option<SocketAddr> {
106 *self.actual_addr.read().unwrap()
107 }
108
109 pub fn port(&self) -> Option<u16> {
111 self.local_addr().map(|a| a.port())
112 }
113}
114
115impl HasVersion for HttpSubsystem {
116 fn version(&self) -> SystemVersion {
117 SystemVersion {
118 name: "http".to_string(),
119 version: env!("CARGO_PKG_VERSION").to_string(),
120 description: "HTTP server subsystem for query and command handling".to_string(),
121 r#type: ComponentType::Subsystem,
122 }
123 }
124}
125
126#[async_trait]
127impl Subsystem for HttpSubsystem {
128 fn name(&self) -> &'static str {
129 "Http"
130 }
131
132 async fn start(&mut self) -> reifydb_core::Result<()> {
133 if self.running.load(Ordering::SeqCst) {
135 return Ok(());
136 }
137
138 let addr = self.bind_addr.clone();
139 let listener = TcpListener::bind(&addr).await.map_err(|e| error!(bind_failed(&addr, e)))?;
140
141 let actual_addr = listener.local_addr().map_err(|e| error!(address_unavailable(e)))?;
142 *self.actual_addr.write().unwrap() = Some(actual_addr);
143 tracing::info!("HTTP server bound to {}", actual_addr);
144
145 let (shutdown_tx, shutdown_rx) = oneshot::channel();
146 let (complete_tx, complete_rx) = oneshot::channel();
147
148 let state = self.state.clone();
149 let running = self.running.clone();
150
151 self.handle.spawn(async move {
152 running.store(true, Ordering::SeqCst);
154
155 let app = router(state);
157 let server = axum::serve(listener, app).with_graceful_shutdown(async {
158 shutdown_rx.await.ok();
159 tracing::info!("HTTP server received shutdown signal");
160 });
161
162 if let Err(e) = server.await {
164 tracing::error!("HTTP server error: {}", e);
165 }
166
167 running.store(false, Ordering::SeqCst);
169 let _ = complete_tx.send(());
170 tracing::info!("HTTP server stopped");
171 });
172
173 self.shutdown_tx = Some(shutdown_tx);
174 self.shutdown_complete_rx = Some(complete_rx);
175 Ok(())
176 }
177
178 async fn shutdown(&mut self) -> reifydb_core::Result<()> {
179 if let Some(tx) = self.shutdown_tx.take() {
181 let _ = tx.send(());
182 }
183
184 if let Some(rx) = self.shutdown_complete_rx.take() {
186 match timeout(Duration::from_secs(30), rx).await {
187 Ok(_) => {
188 tracing::debug!("HTTP server shutdown completed");
189 }
190 Err(_) => {
191 tracing::warn!("HTTP server shutdown timed out");
192 }
193 }
194 }
195
196 Ok(())
197 }
198
199 fn is_running(&self) -> bool {
200 self.running.load(Ordering::SeqCst)
201 }
202
203 fn health_status(&self) -> HealthStatus {
204 if self.running.load(Ordering::SeqCst) {
205 HealthStatus::Healthy
206 } else if self.shutdown_tx.is_some() {
207 HealthStatus::Warning {
209 description: "Starting up".to_string(),
210 }
211 } else {
212 HealthStatus::Failed {
213 description: "Not running".to_string(),
214 }
215 }
216 }
217
218 fn as_any(&self) -> &dyn Any {
219 self
220 }
221
222 fn as_any_mut(&mut self) -> &mut dyn Any {
223 self
224 }
225}