reifydb_sub_server_admin/
subsystem.rs1use std::{
7 any::Any,
8 net::SocketAddr,
9 sync::{
10 Arc, RwLock,
11 atomic::{AtomicBool, Ordering},
12 },
13};
14
15use reifydb_core::interface::version::{ComponentType, HasVersion, SystemVersion};
16use reifydb_sub_api::{HealthStatus, Subsystem};
17use reifydb_sub_server::SharedRuntime;
18use tokio::{runtime::Handle, sync::oneshot};
19
20use crate::state::AdminState;
21
22pub struct AdminSubsystem {
29 bind_addr: String,
31 actual_addr: RwLock<Option<SocketAddr>>,
33 state: AdminState,
35 _runtime: Option<Arc<SharedRuntime>>,
37 handle: Handle,
39 running: Arc<AtomicBool>,
41 shutdown_tx: Option<oneshot::Sender<()>>,
43 shutdown_complete_rx: Option<oneshot::Receiver<()>>,
45}
46
47impl AdminSubsystem {
48 pub fn new(bind_addr: String, state: AdminState, handle: Handle) -> Self {
56 Self {
57 bind_addr,
58 actual_addr: RwLock::new(None),
59 state,
60 _runtime: None,
61 handle,
62 running: Arc::new(AtomicBool::new(false)),
63 shutdown_tx: None,
64 shutdown_complete_rx: None,
65 }
66 }
67
68 pub fn with_runtime(bind_addr: String, state: AdminState, runtime: Arc<SharedRuntime>) -> Self {
78 let handle = runtime.handle();
79 Self {
80 bind_addr,
81 actual_addr: RwLock::new(None),
82 state,
83 _runtime: Some(runtime),
84 handle,
85 running: Arc::new(AtomicBool::new(false)),
86 shutdown_tx: None,
87 shutdown_complete_rx: None,
88 }
89 }
90
91 pub fn bind_addr(&self) -> &str {
93 &self.bind_addr
94 }
95
96 pub fn local_addr(&self) -> Option<SocketAddr> {
98 *self.actual_addr.read().unwrap()
99 }
100
101 pub fn port(&self) -> Option<u16> {
103 self.local_addr().map(|a| a.port())
104 }
105}
106
107impl HasVersion for AdminSubsystem {
108 fn version(&self) -> SystemVersion {
109 SystemVersion {
110 name: "admin".to_string(),
111 version: env!("CARGO_PKG_VERSION").to_string(),
112 description: "Admin server subsystem for web-based administration".to_string(),
113 r#type: ComponentType::Subsystem,
114 }
115 }
116}
117
118impl Subsystem for AdminSubsystem {
119 fn name(&self) -> &'static str {
120 "Admin"
121 }
122
123 fn start(&mut self) -> reifydb_core::Result<()> {
124 if self.running.load(Ordering::SeqCst) {
126 return Ok(());
127 }
128
129 let addr = self.bind_addr.clone();
131 let std_listener = std::net::TcpListener::bind(&addr).map_err(|e| {
132 reifydb_core::error!(reifydb_core::diagnostic::internal::internal(format!(
133 "Failed to bind admin server to {}: {}",
134 &addr, e
135 )))
136 })?;
137 std_listener.set_nonblocking(true).map_err(|e| {
138 reifydb_core::error!(reifydb_core::diagnostic::internal::internal(format!(
139 "Failed to set non-blocking on {}: {}",
140 &addr, e
141 )))
142 })?;
143
144 let actual_addr = std_listener.local_addr().map_err(|e| {
145 reifydb_core::error!(reifydb_core::diagnostic::internal::internal(format!(
146 "Failed to get local address: {}",
147 e
148 )))
149 })?;
150
151 let _guard = self.handle.enter();
153 let listener = tokio::net::TcpListener::from_std(std_listener).map_err(|e| {
154 reifydb_core::error!(reifydb_core::diagnostic::internal::internal(format!(
155 "Failed to convert listener: {}",
156 e
157 )))
158 })?;
159 *self.actual_addr.write().unwrap() = Some(actual_addr);
160 tracing::info!("Admin server bound to {}", actual_addr);
161
162 let (shutdown_tx, shutdown_rx) = oneshot::channel();
163 let (complete_tx, complete_rx) = oneshot::channel();
164
165 let state = self.state.clone();
166 let running = self.running.clone();
167
168 self.handle.spawn(async move {
169 running.store(true, Ordering::SeqCst);
171
172 let app = crate::routes::router(state);
174 let server = axum::serve(listener, app).with_graceful_shutdown(async {
175 shutdown_rx.await.ok();
176 tracing::info!("Admin server received shutdown signal");
177 });
178
179 if let Err(e) = server.await {
181 tracing::error!("Admin server error: {}", e);
182 }
183
184 running.store(false, Ordering::SeqCst);
186 let _ = complete_tx.send(());
187 tracing::info!("Admin server stopped");
188 });
189
190 self.shutdown_tx = Some(shutdown_tx);
191 self.shutdown_complete_rx = Some(complete_rx);
192 Ok(())
193 }
194
195 fn shutdown(&mut self) -> reifydb_core::Result<()> {
196 if let Some(tx) = self.shutdown_tx.take() {
198 let _ = tx.send(());
199 }
200
201 if let Some(rx) = self.shutdown_complete_rx.take() {
203 let handle = self.handle.clone();
204 handle.block_on(async {
205 match tokio::time::timeout(std::time::Duration::from_secs(30), rx).await {
206 Ok(_) => {
207 tracing::debug!("Admin server shutdown completed");
208 }
209 Err(_) => {
210 tracing::warn!("Admin server shutdown timed out");
211 }
212 }
213 });
214 }
215
216 Ok(())
217 }
218
219 fn is_running(&self) -> bool {
220 self.running.load(Ordering::SeqCst)
221 }
222
223 fn health_status(&self) -> HealthStatus {
224 if self.running.load(Ordering::SeqCst) {
225 HealthStatus::Healthy
226 } else if self.shutdown_tx.is_some() {
227 HealthStatus::Warning {
229 description: "Starting up".to_string(),
230 }
231 } else {
232 HealthStatus::Failed {
233 description: "Not running".to_string(),
234 }
235 }
236 }
237
238 fn as_any(&self) -> &dyn Any {
239 self
240 }
241
242 fn as_any_mut(&mut self) -> &mut dyn Any {
243 self
244 }
245}