reifydb_sub_server_admin/
subsystem.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the AGPL-3.0-or-later
3
4//! Admin server subsystem implementing the ReifyDB Subsystem trait.
5
6use std::{
7	any::Any,
8	net::SocketAddr,
9	sync::{
10		Arc, RwLock,
11		atomic::{AtomicBool, Ordering},
12	},
13	time::Duration,
14};
15
16use async_trait::async_trait;
17use reifydb_core::{
18	diagnostic::subsystem::{address_unavailable, bind_failed, socket_config_failed},
19	error,
20	interface::version::{ComponentType, HasVersion, SystemVersion},
21};
22use reifydb_sub_api::{HealthStatus, Subsystem};
23use reifydb_sub_server::SharedRuntime;
24use tokio::{net::TcpListener, runtime::Handle, sync::oneshot, time::timeout};
25
26use crate::state::AdminState;
27
28/// Admin server subsystem.
29///
30/// Manages an Axum-based admin HTTP server with support for:
31/// - Graceful startup and shutdown
32/// - Health monitoring
33/// - Integration with shared tokio runtime
34pub struct AdminSubsystem {
35	/// Address to bind the server to.
36	bind_addr: String,
37	/// Actual bound address (available after start).
38	actual_addr: RwLock<Option<SocketAddr>>,
39	/// Shared application state.
40	state: AdminState,
41	/// The shared runtime (kept alive to prevent premature shutdown).
42	_runtime: Option<SharedRuntime>,
43	/// Handle to the tokio runtime.
44	handle: Handle,
45	/// Flag indicating if the server is running.
46	running: Arc<AtomicBool>,
47	/// Channel to send shutdown signal.
48	shutdown_tx: Option<oneshot::Sender<()>>,
49	/// Channel to receive shutdown completion.
50	shutdown_complete_rx: Option<oneshot::Receiver<()>>,
51}
52
53impl AdminSubsystem {
54	/// Create a new admin subsystem with an owned runtime.
55	///
56	/// This variant keeps the runtime alive for the lifetime of the subsystem.
57	///
58	/// # Arguments
59	///
60	/// * `bind_addr` - Address and port to bind to (e.g., "127.0.0.1:9090")
61	/// * `state` - Shared application state
62	/// * `runtime` - Shared runtime (will be kept alive)
63	pub fn new(bind_addr: String, state: AdminState, runtime: SharedRuntime) -> Self {
64		let handle = runtime.handle();
65		Self {
66			bind_addr,
67			actual_addr: RwLock::new(None),
68			state,
69			_runtime: Some(runtime),
70			handle,
71			running: Arc::new(AtomicBool::new(false)),
72			shutdown_tx: None,
73			shutdown_complete_rx: None,
74		}
75	}
76
77	/// Get the bind address.
78	pub fn bind_addr(&self) -> &str {
79		&self.bind_addr
80	}
81
82	/// Get the actual bound address (available after start).
83	pub fn local_addr(&self) -> Option<SocketAddr> {
84		*self.actual_addr.read().unwrap()
85	}
86
87	/// Get the actual bound port (available after start).
88	pub fn port(&self) -> Option<u16> {
89		self.local_addr().map(|a| a.port())
90	}
91}
92
93impl HasVersion for AdminSubsystem {
94	fn version(&self) -> SystemVersion {
95		SystemVersion {
96			name: "admin".to_string(),
97			version: env!("CARGO_PKG_VERSION").to_string(),
98			description: "Admin server subsystem for web-based administration".to_string(),
99			r#type: ComponentType::Subsystem,
100		}
101	}
102}
103
104#[async_trait]
105impl Subsystem for AdminSubsystem {
106	fn name(&self) -> &'static str {
107		"Admin"
108	}
109
110	async fn start(&mut self) -> reifydb_core::Result<()> {
111		// Idempotent: if already running, return success
112		if self.running.load(Ordering::SeqCst) {
113			return Ok(());
114		}
115
116		// Bind synchronously using std::net, then convert to tokio
117		let addr = self.bind_addr.clone();
118		let std_listener = std::net::TcpListener::bind(&addr).map_err(|e| error!(bind_failed(&addr, e)))?;
119		std_listener.set_nonblocking(true).map_err(|e| error!(socket_config_failed(e)))?;
120
121		let actual_addr = std_listener.local_addr().map_err(|e| error!(address_unavailable(e)))?;
122
123		// Enter the runtime context to convert std listener to tokio
124		let _guard = self.handle.enter();
125		let listener = TcpListener::from_std(std_listener).map_err(|e| error!(socket_config_failed(e)))?;
126		*self.actual_addr.write().unwrap() = Some(actual_addr);
127		tracing::info!("Admin server bound to {}", actual_addr);
128
129		let (shutdown_tx, shutdown_rx) = oneshot::channel();
130		let (complete_tx, complete_rx) = oneshot::channel();
131
132		let state = self.state.clone();
133		let running = self.running.clone();
134
135		self.handle.spawn(async move {
136			// Mark as running
137			running.store(true, Ordering::SeqCst);
138
139			// Create router and serve
140			let app = crate::routes::router(state);
141			let server = axum::serve(listener, app).with_graceful_shutdown(async {
142				shutdown_rx.await.ok();
143				tracing::info!("Admin server received shutdown signal");
144			});
145
146			// Run until shutdown
147			if let Err(e) = server.await {
148				tracing::error!("Admin server error: {}", e);
149			}
150
151			// Mark as stopped
152			running.store(false, Ordering::SeqCst);
153			let _ = complete_tx.send(());
154			tracing::info!("Admin server stopped");
155		});
156
157		self.shutdown_tx = Some(shutdown_tx);
158		self.shutdown_complete_rx = Some(complete_rx);
159		Ok(())
160	}
161
162	async fn shutdown(&mut self) -> reifydb_core::Result<()> {
163		// Send shutdown signal
164		if let Some(tx) = self.shutdown_tx.take() {
165			let _ = tx.send(());
166		}
167
168		// Wait for graceful shutdown with timeout
169		if let Some(rx) = self.shutdown_complete_rx.take() {
170			match timeout(Duration::from_secs(30), rx).await {
171				Ok(_) => {
172					tracing::debug!("Admin server shutdown completed");
173				}
174				Err(_) => {
175					tracing::warn!("Admin server shutdown timed out");
176				}
177			}
178		}
179
180		Ok(())
181	}
182
183	fn is_running(&self) -> bool {
184		self.running.load(Ordering::SeqCst)
185	}
186
187	fn health_status(&self) -> HealthStatus {
188		if self.running.load(Ordering::SeqCst) {
189			HealthStatus::Healthy
190		} else if self.shutdown_tx.is_some() {
191			// Started but not yet running (startup in progress)
192			HealthStatus::Warning {
193				description: "Starting up".to_string(),
194			}
195		} else {
196			HealthStatus::Failed {
197				description: "Not running".to_string(),
198			}
199		}
200	}
201
202	fn as_any(&self) -> &dyn Any {
203		self
204	}
205
206	fn as_any_mut(&mut self) -> &mut dyn Any {
207		self
208	}
209}