reifydb_sub_server_admin/
subsystem.rs1use std::{
7 any::Any,
8 net::SocketAddr,
9 sync::{
10 Arc, RwLock,
11 atomic::{AtomicBool, Ordering},
12 },
13 time::Duration,
14};
15
16use async_trait::async_trait;
17use reifydb_core::{
18 diagnostic::subsystem::{address_unavailable, bind_failed, socket_config_failed},
19 error,
20 interface::version::{ComponentType, HasVersion, SystemVersion},
21};
22use reifydb_sub_api::{HealthStatus, Subsystem};
23use reifydb_sub_server::SharedRuntime;
24use tokio::{net::TcpListener, runtime::Handle, sync::oneshot, time::timeout};
25
26use crate::state::AdminState;
27
28pub struct AdminSubsystem {
35 bind_addr: String,
37 actual_addr: RwLock<Option<SocketAddr>>,
39 state: AdminState,
41 _runtime: Option<SharedRuntime>,
43 handle: Handle,
45 running: Arc<AtomicBool>,
47 shutdown_tx: Option<oneshot::Sender<()>>,
49 shutdown_complete_rx: Option<oneshot::Receiver<()>>,
51}
52
53impl AdminSubsystem {
54 pub fn new(bind_addr: String, state: AdminState, runtime: SharedRuntime) -> Self {
64 let handle = runtime.handle();
65 Self {
66 bind_addr,
67 actual_addr: RwLock::new(None),
68 state,
69 _runtime: Some(runtime),
70 handle,
71 running: Arc::new(AtomicBool::new(false)),
72 shutdown_tx: None,
73 shutdown_complete_rx: None,
74 }
75 }
76
77 pub fn bind_addr(&self) -> &str {
79 &self.bind_addr
80 }
81
82 pub fn local_addr(&self) -> Option<SocketAddr> {
84 *self.actual_addr.read().unwrap()
85 }
86
87 pub fn port(&self) -> Option<u16> {
89 self.local_addr().map(|a| a.port())
90 }
91}
92
93impl HasVersion for AdminSubsystem {
94 fn version(&self) -> SystemVersion {
95 SystemVersion {
96 name: "admin".to_string(),
97 version: env!("CARGO_PKG_VERSION").to_string(),
98 description: "Admin server subsystem for web-based administration".to_string(),
99 r#type: ComponentType::Subsystem,
100 }
101 }
102}
103
104#[async_trait]
105impl Subsystem for AdminSubsystem {
106 fn name(&self) -> &'static str {
107 "Admin"
108 }
109
110 async fn start(&mut self) -> reifydb_core::Result<()> {
111 if self.running.load(Ordering::SeqCst) {
113 return Ok(());
114 }
115
116 let addr = self.bind_addr.clone();
118 let std_listener = std::net::TcpListener::bind(&addr).map_err(|e| error!(bind_failed(&addr, e)))?;
119 std_listener.set_nonblocking(true).map_err(|e| error!(socket_config_failed(e)))?;
120
121 let actual_addr = std_listener.local_addr().map_err(|e| error!(address_unavailable(e)))?;
122
123 let _guard = self.handle.enter();
125 let listener = TcpListener::from_std(std_listener).map_err(|e| error!(socket_config_failed(e)))?;
126 *self.actual_addr.write().unwrap() = Some(actual_addr);
127 tracing::info!("Admin server bound to {}", actual_addr);
128
129 let (shutdown_tx, shutdown_rx) = oneshot::channel();
130 let (complete_tx, complete_rx) = oneshot::channel();
131
132 let state = self.state.clone();
133 let running = self.running.clone();
134
135 self.handle.spawn(async move {
136 running.store(true, Ordering::SeqCst);
138
139 let app = crate::routes::router(state);
141 let server = axum::serve(listener, app).with_graceful_shutdown(async {
142 shutdown_rx.await.ok();
143 tracing::info!("Admin server received shutdown signal");
144 });
145
146 if let Err(e) = server.await {
148 tracing::error!("Admin server error: {}", e);
149 }
150
151 running.store(false, Ordering::SeqCst);
153 let _ = complete_tx.send(());
154 tracing::info!("Admin server stopped");
155 });
156
157 self.shutdown_tx = Some(shutdown_tx);
158 self.shutdown_complete_rx = Some(complete_rx);
159 Ok(())
160 }
161
162 async fn shutdown(&mut self) -> reifydb_core::Result<()> {
163 if let Some(tx) = self.shutdown_tx.take() {
165 let _ = tx.send(());
166 }
167
168 if let Some(rx) = self.shutdown_complete_rx.take() {
170 match timeout(Duration::from_secs(30), rx).await {
171 Ok(_) => {
172 tracing::debug!("Admin server shutdown completed");
173 }
174 Err(_) => {
175 tracing::warn!("Admin server shutdown timed out");
176 }
177 }
178 }
179
180 Ok(())
181 }
182
183 fn is_running(&self) -> bool {
184 self.running.load(Ordering::SeqCst)
185 }
186
187 fn health_status(&self) -> HealthStatus {
188 if self.running.load(Ordering::SeqCst) {
189 HealthStatus::Healthy
190 } else if self.shutdown_tx.is_some() {
191 HealthStatus::Warning {
193 description: "Starting up".to_string(),
194 }
195 } else {
196 HealthStatus::Failed {
197 description: "Not running".to_string(),
198 }
199 }
200 }
201
202 fn as_any(&self) -> &dyn Any {
203 self
204 }
205
206 fn as_any_mut(&mut self) -> &mut dyn Any {
207 self
208 }
209}