reifydb_sub_server_http/
subsystem.rs1use std::{
10 any::Any,
11 net::SocketAddr,
12 sync::{
13 Arc, RwLock,
14 atomic::{AtomicBool, Ordering},
15 },
16};
17
18use axum::serve;
19use reifydb_core::{
20 error::CoreError,
21 interface::version::{ComponentType, HasVersion, SystemVersion},
22};
23use reifydb_runtime::SharedRuntime;
24use reifydb_sub_api::subsystem::{HealthStatus, Subsystem};
25use reifydb_sub_server::state::AppState;
26use reifydb_type::{Result, error::Error};
27use tokio::{net::TcpListener, sync::oneshot};
28use tracing::{error, info};
29
30use crate::routes::router;
31
32pub struct HttpSubsystem {
55 bind_addr: String,
57 actual_addr: RwLock<Option<SocketAddr>>,
59 state: AppState,
61 running: Arc<AtomicBool>,
63 shutdown_tx: Option<oneshot::Sender<()>>,
65 shutdown_complete_rx: Option<oneshot::Receiver<()>>,
67 runtime: SharedRuntime,
69}
70
71impl HttpSubsystem {
72 pub fn new(bind_addr: String, state: AppState, runtime: SharedRuntime) -> Self {
80 Self {
81 bind_addr,
82 actual_addr: RwLock::new(None),
83 state,
84 running: Arc::new(AtomicBool::new(false)),
85 shutdown_tx: None,
86 shutdown_complete_rx: None,
87 runtime,
88 }
89 }
90
91 pub fn bind_addr(&self) -> &str {
93 &self.bind_addr
94 }
95
96 pub fn local_addr(&self) -> Option<SocketAddr> {
98 *self.actual_addr.read().unwrap()
99 }
100
101 pub fn port(&self) -> Option<u16> {
103 self.local_addr().map(|a| a.port())
104 }
105}
106
107impl HasVersion for HttpSubsystem {
108 fn version(&self) -> SystemVersion {
109 SystemVersion {
110 name: env!("CARGO_PKG_NAME")
111 .strip_prefix("reifydb-")
112 .unwrap_or(env!("CARGO_PKG_NAME"))
113 .to_string(),
114 version: env!("CARGO_PKG_VERSION").to_string(),
115 description: "HTTP server subsystem for query and command handling".to_string(),
116 r#type: ComponentType::Subsystem,
117 }
118 }
119}
120
121impl Subsystem for HttpSubsystem {
122 fn name(&self) -> &'static str {
123 "Http"
124 }
125
126 fn start(&mut self) -> Result<()> {
127 if self.running.load(Ordering::SeqCst) {
129 return Ok(());
130 }
131
132 let addr = self.bind_addr.clone();
133 let runtime = self.runtime.clone();
134 let listener = runtime.block_on(TcpListener::bind(&addr)).map_err(|e| {
135 let err: Error = CoreError::SubsystemBindFailed {
136 addr: addr.clone(),
137 reason: e.to_string(),
138 }
139 .into();
140 err
141 })?;
142
143 let actual_addr = listener.local_addr().map_err(|e| {
144 let err: Error = CoreError::SubsystemAddressUnavailable {
145 reason: e.to_string(),
146 }
147 .into();
148 err
149 })?;
150 *self.actual_addr.write().unwrap() = Some(actual_addr);
151 info!("HTTP server bound to {}", actual_addr);
152
153 let (shutdown_tx, shutdown_rx) = oneshot::channel();
154 let (complete_tx, complete_rx) = oneshot::channel();
155
156 let state = self.state.clone();
157 let running = self.running.clone();
158 let runtime = self.runtime.clone();
159
160 runtime.spawn(async move {
161 running.store(true, Ordering::SeqCst);
163
164 let app = router(state);
166 let server = serve(listener, app).with_graceful_shutdown(async {
167 shutdown_rx.await.ok();
168 info!("HTTP server received shutdown signal");
169 });
170
171 if let Err(e) = server.await {
173 error!("HTTP server error: {}", e);
174 }
175
176 running.store(false, Ordering::SeqCst);
178 let _ = complete_tx.send(());
179 info!("HTTP server stopped");
180 });
181
182 self.shutdown_tx = Some(shutdown_tx);
183 self.shutdown_complete_rx = Some(complete_rx);
184 Ok(())
185 }
186
187 fn shutdown(&mut self) -> Result<()> {
188 if let Some(tx) = self.shutdown_tx.take() {
189 let _ = tx.send(());
190 }
191 if let Some(rx) = self.shutdown_complete_rx.take() {
192 let _ = self.runtime.block_on(rx);
193 }
194 Ok(())
195 }
196
197 fn is_running(&self) -> bool {
198 self.running.load(Ordering::SeqCst)
199 }
200
201 fn health_status(&self) -> HealthStatus {
202 if self.running.load(Ordering::SeqCst) {
203 HealthStatus::Healthy
204 } else if self.shutdown_tx.is_some() {
205 HealthStatus::Warning {
207 description: "Starting up".to_string(),
208 }
209 } else {
210 HealthStatus::Failed {
211 description: "Not running".to_string(),
212 }
213 }
214 }
215
216 fn as_any(&self) -> &dyn Any {
217 self
218 }
219
220 fn as_any_mut(&mut self) -> &mut dyn Any {
221 self
222 }
223}