reifydb_sub_server_admin/
subsystem.rs1use std::{
7 any::Any,
8 net::SocketAddr,
9 sync::{
10 Arc, RwLock,
11 atomic::{AtomicBool, Ordering},
12 },
13};
14
15use reifydb_core::{
16 error::diagnostic::subsystem::{address_unavailable, bind_failed},
17 interface::version::{ComponentType, HasVersion, SystemVersion},
18};
19use reifydb_runtime::SharedRuntime;
20use reifydb_sub_api::subsystem::{HealthStatus, Subsystem};
21use reifydb_type::error;
22use tokio::{net::TcpListener, sync::oneshot};
23
24use crate::state::AdminState;
25
26pub struct AdminSubsystem {
32 bind_addr: String,
34 actual_addr: RwLock<Option<SocketAddr>>,
36 state: AdminState,
38 running: Arc<AtomicBool>,
40 shutdown_tx: Option<oneshot::Sender<()>>,
42 shutdown_complete_rx: Option<oneshot::Receiver<()>>,
44 runtime: SharedRuntime,
46}
47
48impl AdminSubsystem {
49 pub fn new(bind_addr: String, state: AdminState, runtime: SharedRuntime) -> Self {
57 Self {
58 bind_addr,
59 actual_addr: RwLock::new(None),
60 state,
61 running: Arc::new(AtomicBool::new(false)),
62 shutdown_tx: None,
63 shutdown_complete_rx: None,
64 runtime,
65 }
66 }
67
68 pub fn bind_addr(&self) -> &str {
70 &self.bind_addr
71 }
72
73 pub fn local_addr(&self) -> Option<SocketAddr> {
75 *self.actual_addr.read().unwrap()
76 }
77
78 pub fn port(&self) -> Option<u16> {
80 self.local_addr().map(|a| a.port())
81 }
82}
83
84impl HasVersion for AdminSubsystem {
85 fn version(&self) -> SystemVersion {
86 SystemVersion {
87 name: env!("CARGO_PKG_NAME")
88 .strip_prefix("reifydb-")
89 .unwrap_or(env!("CARGO_PKG_NAME"))
90 .to_string(),
91 version: env!("CARGO_PKG_VERSION").to_string(),
92 description: "Admin server subsystem for web-based administration".to_string(),
93 r#type: ComponentType::Subsystem,
94 }
95 }
96}
97
98impl Subsystem for AdminSubsystem {
99 fn name(&self) -> &'static str {
100 "Admin"
101 }
102
103 fn start(&mut self) -> reifydb_type::Result<()> {
104 if self.running.load(Ordering::SeqCst) {
106 return Ok(());
107 }
108
109 let addr = self.bind_addr.clone();
110 let runtime = self.runtime.clone();
111 let listener = runtime.block_on(TcpListener::bind(&addr)).map_err(|e| error!(bind_failed(&addr, e)))?;
112
113 let actual_addr = listener.local_addr().map_err(|e| error!(address_unavailable(e)))?;
114 *self.actual_addr.write().unwrap() = Some(actual_addr);
115 tracing::info!("Admin server bound to {}", actual_addr);
116
117 let (shutdown_tx, shutdown_rx) = oneshot::channel();
118 let (complete_tx, complete_rx) = oneshot::channel();
119
120 let state = self.state.clone();
121 let running = self.running.clone();
122 let runtime = self.runtime.clone();
123
124 runtime.spawn(async move {
125 running.store(true, Ordering::SeqCst);
127
128 let app = crate::routes::router(state);
130 let server = axum::serve(listener, app).with_graceful_shutdown(async {
131 shutdown_rx.await.ok();
132 tracing::info!("Admin server received shutdown signal");
133 });
134
135 if let Err(e) = server.await {
137 tracing::error!("Admin server error: {}", e);
138 }
139
140 running.store(false, Ordering::SeqCst);
142 let _ = complete_tx.send(());
143 tracing::info!("Admin server stopped");
144 });
145
146 self.shutdown_tx = Some(shutdown_tx);
147 self.shutdown_complete_rx = Some(complete_rx);
148 Ok(())
149 }
150
151 fn shutdown(&mut self) -> reifydb_type::Result<()> {
152 if let Some(tx) = self.shutdown_tx.take() {
153 let _ = tx.send(());
154 }
155 if let Some(rx) = self.shutdown_complete_rx.take() {
156 let _ = self.runtime.block_on(rx);
157 }
158 Ok(())
159 }
160
161 fn is_running(&self) -> bool {
162 self.running.load(Ordering::SeqCst)
163 }
164
165 fn health_status(&self) -> HealthStatus {
166 if self.running.load(Ordering::SeqCst) {
167 HealthStatus::Healthy
168 } else if self.shutdown_tx.is_some() {
169 HealthStatus::Warning {
171 description: "Starting up".to_string(),
172 }
173 } else {
174 HealthStatus::Failed {
175 description: "Not running".to_string(),
176 }
177 }
178 }
179
180 fn as_any(&self) -> &dyn Any {
181 self
182 }
183
184 fn as_any_mut(&mut self) -> &mut dyn Any {
185 self
186 }
187}