reifydb_sub_server_admin/
subsystem.rs1use std::{
7 any::Any,
8 net::SocketAddr,
9 sync::{
10 Arc, RwLock,
11 atomic::{AtomicBool, Ordering},
12 },
13};
14
15use reifydb_core::interface::version::{ComponentType, HasVersion, SystemVersion};
16use reifydb_sub_api::{HealthStatus, Subsystem};
17use reifydb_sub_server::SharedRuntime;
18use tokio::{runtime::Handle, sync::oneshot};
19
20use crate::state::AdminState;
21
22pub struct AdminSubsystem {
29 bind_addr: String,
31 actual_addr: RwLock<Option<SocketAddr>>,
33 state: AdminState,
35 _runtime: Option<SharedRuntime>,
37 handle: Handle,
39 running: Arc<AtomicBool>,
41 shutdown_tx: Option<oneshot::Sender<()>>,
43 shutdown_complete_rx: Option<oneshot::Receiver<()>>,
45}
46
47impl AdminSubsystem {
48 pub fn new(bind_addr: String, state: AdminState, runtime: SharedRuntime) -> Self {
58 let handle = runtime.handle();
59 Self {
60 bind_addr,
61 actual_addr: RwLock::new(None),
62 state,
63 _runtime: Some(runtime),
64 handle,
65 running: Arc::new(AtomicBool::new(false)),
66 shutdown_tx: None,
67 shutdown_complete_rx: None,
68 }
69 }
70
71 pub fn bind_addr(&self) -> &str {
73 &self.bind_addr
74 }
75
76 pub fn local_addr(&self) -> Option<SocketAddr> {
78 *self.actual_addr.read().unwrap()
79 }
80
81 pub fn port(&self) -> Option<u16> {
83 self.local_addr().map(|a| a.port())
84 }
85}
86
87impl HasVersion for AdminSubsystem {
88 fn version(&self) -> SystemVersion {
89 SystemVersion {
90 name: "admin".to_string(),
91 version: env!("CARGO_PKG_VERSION").to_string(),
92 description: "Admin server subsystem for web-based administration".to_string(),
93 r#type: ComponentType::Subsystem,
94 }
95 }
96}
97
98impl Subsystem for AdminSubsystem {
99 fn name(&self) -> &'static str {
100 "Admin"
101 }
102
103 fn start(&mut self) -> reifydb_core::Result<()> {
104 if self.running.load(Ordering::SeqCst) {
106 return Ok(());
107 }
108
109 let addr = self.bind_addr.clone();
111 let std_listener = std::net::TcpListener::bind(&addr).map_err(|e| {
112 reifydb_core::error!(reifydb_core::diagnostic::internal::internal(format!(
113 "Failed to bind admin server to {}: {}",
114 &addr, e
115 )))
116 })?;
117 std_listener.set_nonblocking(true).map_err(|e| {
118 reifydb_core::error!(reifydb_core::diagnostic::internal::internal(format!(
119 "Failed to set non-blocking on {}: {}",
120 &addr, e
121 )))
122 })?;
123
124 let actual_addr = std_listener.local_addr().map_err(|e| {
125 reifydb_core::error!(reifydb_core::diagnostic::internal::internal(format!(
126 "Failed to get local address: {}",
127 e
128 )))
129 })?;
130
131 let _guard = self.handle.enter();
133 let listener = tokio::net::TcpListener::from_std(std_listener).map_err(|e| {
134 reifydb_core::error!(reifydb_core::diagnostic::internal::internal(format!(
135 "Failed to convert listener: {}",
136 e
137 )))
138 })?;
139 *self.actual_addr.write().unwrap() = Some(actual_addr);
140 tracing::info!("Admin server bound to {}", actual_addr);
141
142 let (shutdown_tx, shutdown_rx) = oneshot::channel();
143 let (complete_tx, complete_rx) = oneshot::channel();
144
145 let state = self.state.clone();
146 let running = self.running.clone();
147
148 self.handle.spawn(async move {
149 running.store(true, Ordering::SeqCst);
151
152 let app = crate::routes::router(state);
154 let server = axum::serve(listener, app).with_graceful_shutdown(async {
155 shutdown_rx.await.ok();
156 tracing::info!("Admin server received shutdown signal");
157 });
158
159 if let Err(e) = server.await {
161 tracing::error!("Admin server error: {}", e);
162 }
163
164 running.store(false, Ordering::SeqCst);
166 let _ = complete_tx.send(());
167 tracing::info!("Admin server stopped");
168 });
169
170 self.shutdown_tx = Some(shutdown_tx);
171 self.shutdown_complete_rx = Some(complete_rx);
172 Ok(())
173 }
174
175 fn shutdown(&mut self) -> reifydb_core::Result<()> {
176 if let Some(tx) = self.shutdown_tx.take() {
178 let _ = tx.send(());
179 }
180
181 if let Some(rx) = self.shutdown_complete_rx.take() {
183 let handle = self.handle.clone();
184 handle.block_on(async {
185 match tokio::time::timeout(std::time::Duration::from_secs(30), rx).await {
186 Ok(_) => {
187 tracing::debug!("Admin server shutdown completed");
188 }
189 Err(_) => {
190 tracing::warn!("Admin server shutdown timed out");
191 }
192 }
193 });
194 }
195
196 Ok(())
197 }
198
199 fn is_running(&self) -> bool {
200 self.running.load(Ordering::SeqCst)
201 }
202
203 fn health_status(&self) -> HealthStatus {
204 if self.running.load(Ordering::SeqCst) {
205 HealthStatus::Healthy
206 } else if self.shutdown_tx.is_some() {
207 HealthStatus::Warning {
209 description: "Starting up".to_string(),
210 }
211 } else {
212 HealthStatus::Failed {
213 description: "Not running".to_string(),
214 }
215 }
216 }
217
218 fn as_any(&self) -> &dyn Any {
219 self
220 }
221
222 fn as_any_mut(&mut self) -> &mut dyn Any {
223 self
224 }
225}