reifydb_sub_server_admin/
subsystem.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later
3
4//! Admin server subsystem implementing the ReifyDB Subsystem trait.
5
6use std::{
7	any::Any,
8	net::SocketAddr,
9	sync::{
10		Arc, RwLock,
11		atomic::{AtomicBool, Ordering},
12	},
13};
14
15use reifydb_core::interface::version::{ComponentType, HasVersion, SystemVersion};
16use reifydb_sub_api::{HealthStatus, Subsystem};
17use reifydb_sub_server::SharedRuntime;
18use tokio::{runtime::Handle, sync::oneshot};
19
20use crate::state::AdminState;
21
22/// Admin server subsystem.
23///
24/// Manages an Axum-based admin HTTP server with support for:
25/// - Graceful startup and shutdown
26/// - Health monitoring
27/// - Integration with shared tokio runtime
28pub struct AdminSubsystem {
29	/// Address to bind the server to.
30	bind_addr: String,
31	/// Actual bound address (available after start).
32	actual_addr: RwLock<Option<SocketAddr>>,
33	/// Shared application state.
34	state: AdminState,
35	/// The shared runtime (kept alive to prevent premature shutdown).
36	_runtime: Option<Arc<SharedRuntime>>,
37	/// Handle to the tokio runtime.
38	handle: Handle,
39	/// Flag indicating if the server is running.
40	running: Arc<AtomicBool>,
41	/// Channel to send shutdown signal.
42	shutdown_tx: Option<oneshot::Sender<()>>,
43	/// Channel to receive shutdown completion.
44	shutdown_complete_rx: Option<oneshot::Receiver<()>>,
45}
46
47impl AdminSubsystem {
48	/// Create a new admin subsystem.
49	///
50	/// # Arguments
51	///
52	/// * `bind_addr` - Address and port to bind to (e.g., "127.0.0.1:9090")
53	/// * `state` - Shared application state
54	/// * `handle` - Handle to the tokio runtime
55	pub fn new(bind_addr: String, state: AdminState, handle: Handle) -> Self {
56		Self {
57			bind_addr,
58			actual_addr: RwLock::new(None),
59			state,
60			_runtime: None,
61			handle,
62			running: Arc::new(AtomicBool::new(false)),
63			shutdown_tx: None,
64			shutdown_complete_rx: None,
65		}
66	}
67
68	/// Create a new admin subsystem with an owned runtime.
69	///
70	/// This variant keeps the runtime alive for the lifetime of the subsystem.
71	///
72	/// # Arguments
73	///
74	/// * `bind_addr` - Address and port to bind to (e.g., "127.0.0.1:9090")
75	/// * `state` - Shared application state
76	/// * `runtime` - Shared runtime (will be kept alive)
77	pub fn with_runtime(bind_addr: String, state: AdminState, runtime: Arc<SharedRuntime>) -> Self {
78		let handle = runtime.handle();
79		Self {
80			bind_addr,
81			actual_addr: RwLock::new(None),
82			state,
83			_runtime: Some(runtime),
84			handle,
85			running: Arc::new(AtomicBool::new(false)),
86			shutdown_tx: None,
87			shutdown_complete_rx: None,
88		}
89	}
90
91	/// Get the bind address.
92	pub fn bind_addr(&self) -> &str {
93		&self.bind_addr
94	}
95
96	/// Get the actual bound address (available after start).
97	pub fn local_addr(&self) -> Option<SocketAddr> {
98		*self.actual_addr.read().unwrap()
99	}
100
101	/// Get the actual bound port (available after start).
102	pub fn port(&self) -> Option<u16> {
103		self.local_addr().map(|a| a.port())
104	}
105}
106
107impl HasVersion for AdminSubsystem {
108	fn version(&self) -> SystemVersion {
109		SystemVersion {
110			name: "admin".to_string(),
111			version: env!("CARGO_PKG_VERSION").to_string(),
112			description: "Admin server subsystem for web-based administration".to_string(),
113			r#type: ComponentType::Subsystem,
114		}
115	}
116}
117
118impl Subsystem for AdminSubsystem {
119	fn name(&self) -> &'static str {
120		"Admin"
121	}
122
123	fn start(&mut self) -> reifydb_core::Result<()> {
124		// Idempotent: if already running, return success
125		if self.running.load(Ordering::SeqCst) {
126			return Ok(());
127		}
128
129		// Bind synchronously using std::net, then convert to tokio
130		let addr = self.bind_addr.clone();
131		let std_listener = std::net::TcpListener::bind(&addr).map_err(|e| {
132			reifydb_core::error!(reifydb_core::diagnostic::internal::internal(format!(
133				"Failed to bind admin server to {}: {}",
134				&addr, e
135			)))
136		})?;
137		std_listener.set_nonblocking(true).map_err(|e| {
138			reifydb_core::error!(reifydb_core::diagnostic::internal::internal(format!(
139				"Failed to set non-blocking on {}: {}",
140				&addr, e
141			)))
142		})?;
143
144		let actual_addr = std_listener.local_addr().map_err(|e| {
145			reifydb_core::error!(reifydb_core::diagnostic::internal::internal(format!(
146				"Failed to get local address: {}",
147				e
148			)))
149		})?;
150
151		// Enter the runtime context to convert std listener to tokio
152		let _guard = self.handle.enter();
153		let listener = tokio::net::TcpListener::from_std(std_listener).map_err(|e| {
154			reifydb_core::error!(reifydb_core::diagnostic::internal::internal(format!(
155				"Failed to convert listener: {}",
156				e
157			)))
158		})?;
159		*self.actual_addr.write().unwrap() = Some(actual_addr);
160		tracing::info!("Admin server bound to {}", actual_addr);
161
162		let (shutdown_tx, shutdown_rx) = oneshot::channel();
163		let (complete_tx, complete_rx) = oneshot::channel();
164
165		let state = self.state.clone();
166		let running = self.running.clone();
167
168		self.handle.spawn(async move {
169			// Mark as running
170			running.store(true, Ordering::SeqCst);
171
172			// Create router and serve
173			let app = crate::routes::router(state);
174			let server = axum::serve(listener, app).with_graceful_shutdown(async {
175				shutdown_rx.await.ok();
176				tracing::info!("Admin server received shutdown signal");
177			});
178
179			// Run until shutdown
180			if let Err(e) = server.await {
181				tracing::error!("Admin server error: {}", e);
182			}
183
184			// Mark as stopped
185			running.store(false, Ordering::SeqCst);
186			let _ = complete_tx.send(());
187			tracing::info!("Admin server stopped");
188		});
189
190		self.shutdown_tx = Some(shutdown_tx);
191		self.shutdown_complete_rx = Some(complete_rx);
192		Ok(())
193	}
194
195	fn shutdown(&mut self) -> reifydb_core::Result<()> {
196		// Send shutdown signal
197		if let Some(tx) = self.shutdown_tx.take() {
198			let _ = tx.send(());
199		}
200
201		// Wait for graceful shutdown with timeout
202		if let Some(rx) = self.shutdown_complete_rx.take() {
203			let handle = self.handle.clone();
204			handle.block_on(async {
205				match tokio::time::timeout(std::time::Duration::from_secs(30), rx).await {
206					Ok(_) => {
207						tracing::debug!("Admin server shutdown completed");
208					}
209					Err(_) => {
210						tracing::warn!("Admin server shutdown timed out");
211					}
212				}
213			});
214		}
215
216		Ok(())
217	}
218
219	fn is_running(&self) -> bool {
220		self.running.load(Ordering::SeqCst)
221	}
222
223	fn health_status(&self) -> HealthStatus {
224		if self.running.load(Ordering::SeqCst) {
225			HealthStatus::Healthy
226		} else if self.shutdown_tx.is_some() {
227			// Started but not yet running (startup in progress)
228			HealthStatus::Warning {
229				description: "Starting up".to_string(),
230			}
231		} else {
232			HealthStatus::Failed {
233				description: "Not running".to_string(),
234			}
235		}
236	}
237
238	fn as_any(&self) -> &dyn Any {
239		self
240	}
241
242	fn as_any_mut(&mut self) -> &mut dyn Any {
243		self
244	}
245}