reifydb_sub_server_http/
subsystem.rs1use std::{
10 any::Any,
11 net::SocketAddr,
12 sync::{
13 Arc, RwLock,
14 atomic::{AtomicBool, Ordering},
15 },
16};
17
18use reifydb_core::{
19 error::diagnostic::subsystem::{address_unavailable, bind_failed},
20 interface::version::{ComponentType, HasVersion, SystemVersion},
21};
22use reifydb_runtime::SharedRuntime;
23use reifydb_sub_api::subsystem::{HealthStatus, Subsystem};
24use reifydb_sub_server::state::AppState;
25use reifydb_type::error;
26use tokio::{net::TcpListener, sync::oneshot};
27
28use crate::routes::router;
29
30pub struct HttpSubsystem {
53 bind_addr: String,
55 actual_addr: RwLock<Option<SocketAddr>>,
57 state: AppState,
59 running: Arc<AtomicBool>,
61 shutdown_tx: Option<oneshot::Sender<()>>,
63 shutdown_complete_rx: Option<oneshot::Receiver<()>>,
65 runtime: SharedRuntime,
67}
68
69impl HttpSubsystem {
70 pub fn new(bind_addr: String, state: AppState, runtime: SharedRuntime) -> Self {
78 Self {
79 bind_addr,
80 actual_addr: RwLock::new(None),
81 state,
82 running: Arc::new(AtomicBool::new(false)),
83 shutdown_tx: None,
84 shutdown_complete_rx: None,
85 runtime,
86 }
87 }
88
89 pub fn bind_addr(&self) -> &str {
91 &self.bind_addr
92 }
93
94 pub fn local_addr(&self) -> Option<SocketAddr> {
96 *self.actual_addr.read().unwrap()
97 }
98
99 pub fn port(&self) -> Option<u16> {
101 self.local_addr().map(|a| a.port())
102 }
103}
104
105impl HasVersion for HttpSubsystem {
106 fn version(&self) -> SystemVersion {
107 SystemVersion {
108 name: env!("CARGO_PKG_NAME")
109 .strip_prefix("reifydb-")
110 .unwrap_or(env!("CARGO_PKG_NAME"))
111 .to_string(),
112 version: env!("CARGO_PKG_VERSION").to_string(),
113 description: "HTTP server subsystem for query and command handling".to_string(),
114 r#type: ComponentType::Subsystem,
115 }
116 }
117}
118
119impl Subsystem for HttpSubsystem {
120 fn name(&self) -> &'static str {
121 "Http"
122 }
123
124 fn start(&mut self) -> reifydb_type::Result<()> {
125 if self.running.load(Ordering::SeqCst) {
127 return Ok(());
128 }
129
130 let addr = self.bind_addr.clone();
131 let runtime = self.runtime.clone();
132 let listener = runtime.block_on(TcpListener::bind(&addr)).map_err(|e| error!(bind_failed(&addr, e)))?;
133
134 let actual_addr = listener.local_addr().map_err(|e| error!(address_unavailable(e)))?;
135 *self.actual_addr.write().unwrap() = Some(actual_addr);
136 tracing::info!("HTTP server bound to {}", actual_addr);
137
138 let (shutdown_tx, shutdown_rx) = oneshot::channel();
139 let (complete_tx, complete_rx) = oneshot::channel();
140
141 let state = self.state.clone();
142 let running = self.running.clone();
143 let runtime = self.runtime.clone();
144
145 runtime.spawn(async move {
146 running.store(true, Ordering::SeqCst);
148
149 let app = router(state);
151 let server = axum::serve(listener, app).with_graceful_shutdown(async {
152 shutdown_rx.await.ok();
153 tracing::info!("HTTP server received shutdown signal");
154 });
155
156 if let Err(e) = server.await {
158 tracing::error!("HTTP server error: {}", e);
159 }
160
161 running.store(false, Ordering::SeqCst);
163 let _ = complete_tx.send(());
164 tracing::info!("HTTP server stopped");
165 });
166
167 self.shutdown_tx = Some(shutdown_tx);
168 self.shutdown_complete_rx = Some(complete_rx);
169 Ok(())
170 }
171
172 fn shutdown(&mut self) -> reifydb_type::Result<()> {
173 if let Some(tx) = self.shutdown_tx.take() {
174 let _ = tx.send(());
175 }
176 if let Some(rx) = self.shutdown_complete_rx.take() {
177 let _ = self.runtime.block_on(rx);
178 }
179 Ok(())
180 }
181
182 fn is_running(&self) -> bool {
183 self.running.load(Ordering::SeqCst)
184 }
185
186 fn health_status(&self) -> HealthStatus {
187 if self.running.load(Ordering::SeqCst) {
188 HealthStatus::Healthy
189 } else if self.shutdown_tx.is_some() {
190 HealthStatus::Warning {
192 description: "Starting up".to_string(),
193 }
194 } else {
195 HealthStatus::Failed {
196 description: "Not running".to_string(),
197 }
198 }
199 }
200
201 fn as_any(&self) -> &dyn Any {
202 self
203 }
204
205 fn as_any_mut(&mut self) -> &mut dyn Any {
206 self
207 }
208}