reifydb_sub_server_admin/
subsystem.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later
3
4//! Admin server subsystem implementing the ReifyDB Subsystem trait.
5
6use std::{
7	any::Any,
8	net::SocketAddr,
9	sync::{
10		Arc, RwLock,
11		atomic::{AtomicBool, Ordering},
12	},
13};
14
15use reifydb_core::interface::version::{ComponentType, HasVersion, SystemVersion};
16use reifydb_sub_api::{HealthStatus, Subsystem};
17use reifydb_sub_server::SharedRuntime;
18use tokio::{runtime::Handle, sync::oneshot};
19
20use crate::state::AdminState;
21
22/// Admin server subsystem.
23///
24/// Manages an Axum-based admin HTTP server with support for:
25/// - Graceful startup and shutdown
26/// - Health monitoring
27/// - Integration with shared tokio runtime
28pub struct AdminSubsystem {
29	/// Address to bind the server to.
30	bind_addr: String,
31	/// Actual bound address (available after start).
32	actual_addr: RwLock<Option<SocketAddr>>,
33	/// Shared application state.
34	state: AdminState,
35	/// The shared runtime (kept alive to prevent premature shutdown).
36	_runtime: Option<SharedRuntime>,
37	/// Handle to the tokio runtime.
38	handle: Handle,
39	/// Flag indicating if the server is running.
40	running: Arc<AtomicBool>,
41	/// Channel to send shutdown signal.
42	shutdown_tx: Option<oneshot::Sender<()>>,
43	/// Channel to receive shutdown completion.
44	shutdown_complete_rx: Option<oneshot::Receiver<()>>,
45}
46
47impl AdminSubsystem {
48	/// Create a new admin subsystem with an owned runtime.
49	///
50	/// This variant keeps the runtime alive for the lifetime of the subsystem.
51	///
52	/// # Arguments
53	///
54	/// * `bind_addr` - Address and port to bind to (e.g., "127.0.0.1:9090")
55	/// * `state` - Shared application state
56	/// * `runtime` - Shared runtime (will be kept alive)
57	pub fn new(bind_addr: String, state: AdminState, runtime: SharedRuntime) -> Self {
58		let handle = runtime.handle();
59		Self {
60			bind_addr,
61			actual_addr: RwLock::new(None),
62			state,
63			_runtime: Some(runtime),
64			handle,
65			running: Arc::new(AtomicBool::new(false)),
66			shutdown_tx: None,
67			shutdown_complete_rx: None,
68		}
69	}
70
71	/// Get the bind address.
72	pub fn bind_addr(&self) -> &str {
73		&self.bind_addr
74	}
75
76	/// Get the actual bound address (available after start).
77	pub fn local_addr(&self) -> Option<SocketAddr> {
78		*self.actual_addr.read().unwrap()
79	}
80
81	/// Get the actual bound port (available after start).
82	pub fn port(&self) -> Option<u16> {
83		self.local_addr().map(|a| a.port())
84	}
85}
86
87impl HasVersion for AdminSubsystem {
88	fn version(&self) -> SystemVersion {
89		SystemVersion {
90			name: "admin".to_string(),
91			version: env!("CARGO_PKG_VERSION").to_string(),
92			description: "Admin server subsystem for web-based administration".to_string(),
93			r#type: ComponentType::Subsystem,
94		}
95	}
96}
97
98impl Subsystem for AdminSubsystem {
99	fn name(&self) -> &'static str {
100		"Admin"
101	}
102
103	fn start(&mut self) -> reifydb_core::Result<()> {
104		// Idempotent: if already running, return success
105		if self.running.load(Ordering::SeqCst) {
106			return Ok(());
107		}
108
109		// Bind synchronously using std::net, then convert to tokio
110		let addr = self.bind_addr.clone();
111		let std_listener = std::net::TcpListener::bind(&addr).map_err(|e| {
112			reifydb_core::error!(reifydb_core::diagnostic::internal::internal(format!(
113				"Failed to bind admin server to {}: {}",
114				&addr, e
115			)))
116		})?;
117		std_listener.set_nonblocking(true).map_err(|e| {
118			reifydb_core::error!(reifydb_core::diagnostic::internal::internal(format!(
119				"Failed to set non-blocking on {}: {}",
120				&addr, e
121			)))
122		})?;
123
124		let actual_addr = std_listener.local_addr().map_err(|e| {
125			reifydb_core::error!(reifydb_core::diagnostic::internal::internal(format!(
126				"Failed to get local address: {}",
127				e
128			)))
129		})?;
130
131		// Enter the runtime context to convert std listener to tokio
132		let _guard = self.handle.enter();
133		let listener = tokio::net::TcpListener::from_std(std_listener).map_err(|e| {
134			reifydb_core::error!(reifydb_core::diagnostic::internal::internal(format!(
135				"Failed to convert listener: {}",
136				e
137			)))
138		})?;
139		*self.actual_addr.write().unwrap() = Some(actual_addr);
140		tracing::info!("Admin server bound to {}", actual_addr);
141
142		let (shutdown_tx, shutdown_rx) = oneshot::channel();
143		let (complete_tx, complete_rx) = oneshot::channel();
144
145		let state = self.state.clone();
146		let running = self.running.clone();
147
148		self.handle.spawn(async move {
149			// Mark as running
150			running.store(true, Ordering::SeqCst);
151
152			// Create router and serve
153			let app = crate::routes::router(state);
154			let server = axum::serve(listener, app).with_graceful_shutdown(async {
155				shutdown_rx.await.ok();
156				tracing::info!("Admin server received shutdown signal");
157			});
158
159			// Run until shutdown
160			if let Err(e) = server.await {
161				tracing::error!("Admin server error: {}", e);
162			}
163
164			// Mark as stopped
165			running.store(false, Ordering::SeqCst);
166			let _ = complete_tx.send(());
167			tracing::info!("Admin server stopped");
168		});
169
170		self.shutdown_tx = Some(shutdown_tx);
171		self.shutdown_complete_rx = Some(complete_rx);
172		Ok(())
173	}
174
175	fn shutdown(&mut self) -> reifydb_core::Result<()> {
176		// Send shutdown signal
177		if let Some(tx) = self.shutdown_tx.take() {
178			let _ = tx.send(());
179		}
180
181		// Wait for graceful shutdown with timeout
182		if let Some(rx) = self.shutdown_complete_rx.take() {
183			let handle = self.handle.clone();
184			handle.block_on(async {
185				match tokio::time::timeout(std::time::Duration::from_secs(30), rx).await {
186					Ok(_) => {
187						tracing::debug!("Admin server shutdown completed");
188					}
189					Err(_) => {
190						tracing::warn!("Admin server shutdown timed out");
191					}
192				}
193			});
194		}
195
196		Ok(())
197	}
198
199	fn is_running(&self) -> bool {
200		self.running.load(Ordering::SeqCst)
201	}
202
203	fn health_status(&self) -> HealthStatus {
204		if self.running.load(Ordering::SeqCst) {
205			HealthStatus::Healthy
206		} else if self.shutdown_tx.is_some() {
207			// Started but not yet running (startup in progress)
208			HealthStatus::Warning {
209				description: "Starting up".to_string(),
210			}
211		} else {
212			HealthStatus::Failed {
213				description: "Not running".to_string(),
214			}
215		}
216	}
217
218	fn as_any(&self) -> &dyn Any {
219		self
220	}
221
222	fn as_any_mut(&mut self) -> &mut dyn Any {
223		self
224	}
225}