reifydb_sub_server_admin/
subsystem.rs1use std::{
7 any::Any,
8 net::SocketAddr,
9 sync::{
10 Arc, RwLock,
11 atomic::{AtomicBool, Ordering},
12 },
13};
14
15use axum::serve;
16use reifydb_core::{
17 error::CoreError,
18 interface::version::{ComponentType, HasVersion, SystemVersion},
19};
20use reifydb_runtime::SharedRuntime;
21use reifydb_sub_api::subsystem::{HealthStatus, Subsystem};
22use reifydb_type::{Result, error::Error};
23use tokio::{net::TcpListener, sync::oneshot};
24use tracing::{error, info};
25
26use crate::{routes::router, state::AdminState};
27
28pub struct AdminSubsystem {
34 bind_addr: String,
36 actual_addr: RwLock<Option<SocketAddr>>,
38 state: AdminState,
40 running: Arc<AtomicBool>,
42 shutdown_tx: Option<oneshot::Sender<()>>,
44 shutdown_complete_rx: Option<oneshot::Receiver<()>>,
46 runtime: SharedRuntime,
48}
49
50impl AdminSubsystem {
51 pub fn new(bind_addr: String, state: AdminState, runtime: SharedRuntime) -> Self {
59 Self {
60 bind_addr,
61 actual_addr: RwLock::new(None),
62 state,
63 running: Arc::new(AtomicBool::new(false)),
64 shutdown_tx: None,
65 shutdown_complete_rx: None,
66 runtime,
67 }
68 }
69
70 pub fn bind_addr(&self) -> &str {
72 &self.bind_addr
73 }
74
75 pub fn local_addr(&self) -> Option<SocketAddr> {
77 *self.actual_addr.read().unwrap()
78 }
79
80 pub fn port(&self) -> Option<u16> {
82 self.local_addr().map(|a| a.port())
83 }
84}
85
86impl HasVersion for AdminSubsystem {
87 fn version(&self) -> SystemVersion {
88 SystemVersion {
89 name: env!("CARGO_PKG_NAME")
90 .strip_prefix("reifydb-")
91 .unwrap_or(env!("CARGO_PKG_NAME"))
92 .to_string(),
93 version: env!("CARGO_PKG_VERSION").to_string(),
94 description: "Admin server subsystem for web-based administration".to_string(),
95 r#type: ComponentType::Subsystem,
96 }
97 }
98}
99
100impl Subsystem for AdminSubsystem {
101 fn name(&self) -> &'static str {
102 "Admin"
103 }
104
105 fn start(&mut self) -> Result<()> {
106 if self.running.load(Ordering::SeqCst) {
108 return Ok(());
109 }
110
111 let addr = self.bind_addr.clone();
112 let runtime = self.runtime.clone();
113 let listener = runtime.block_on(TcpListener::bind(&addr)).map_err(|e| {
114 let err: Error = CoreError::SubsystemBindFailed {
115 addr: addr.clone(),
116 reason: e.to_string(),
117 }
118 .into();
119 err
120 })?;
121
122 let actual_addr = listener.local_addr().map_err(|e| {
123 let err: Error = CoreError::SubsystemAddressUnavailable {
124 reason: e.to_string(),
125 }
126 .into();
127 err
128 })?;
129 *self.actual_addr.write().unwrap() = Some(actual_addr);
130 info!("Admin server bound to {}", actual_addr);
131
132 let (shutdown_tx, shutdown_rx) = oneshot::channel();
133 let (complete_tx, complete_rx) = oneshot::channel();
134
135 let state = self.state.clone();
136 let running = self.running.clone();
137 let runtime = self.runtime.clone();
138
139 runtime.spawn(async move {
140 running.store(true, Ordering::SeqCst);
142
143 let app = router(state);
145 let server = serve(listener, app).with_graceful_shutdown(async {
146 shutdown_rx.await.ok();
147 info!("Admin server received shutdown signal");
148 });
149
150 if let Err(e) = server.await {
152 error!("Admin server error: {}", e);
153 }
154
155 running.store(false, Ordering::SeqCst);
157 let _ = complete_tx.send(());
158 info!("Admin server stopped");
159 });
160
161 self.shutdown_tx = Some(shutdown_tx);
162 self.shutdown_complete_rx = Some(complete_rx);
163 Ok(())
164 }
165
166 fn shutdown(&mut self) -> Result<()> {
167 if let Some(tx) = self.shutdown_tx.take() {
168 let _ = tx.send(());
169 }
170 if let Some(rx) = self.shutdown_complete_rx.take() {
171 let _ = self.runtime.block_on(rx);
172 }
173 Ok(())
174 }
175
176 fn is_running(&self) -> bool {
177 self.running.load(Ordering::SeqCst)
178 }
179
180 fn health_status(&self) -> HealthStatus {
181 if self.running.load(Ordering::SeqCst) {
182 HealthStatus::Healthy
183 } else if self.shutdown_tx.is_some() {
184 HealthStatus::Warning {
186 description: "Starting up".to_string(),
187 }
188 } else {
189 HealthStatus::Failed {
190 description: "Not running".to_string(),
191 }
192 }
193 }
194
195 fn as_any(&self) -> &dyn Any {
196 self
197 }
198
199 fn as_any_mut(&mut self) -> &mut dyn Any {
200 self
201 }
202}