Skip to main content

reifydb_sub_server_admin/
subsystem.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2025 ReifyDB
3
4//! Admin server subsystem implementing the ReifyDB Subsystem trait.
5
6use std::{
7	any::Any,
8	net::SocketAddr,
9	sync::{
10		Arc, RwLock,
11		atomic::{AtomicBool, Ordering},
12	},
13};
14
15use reifydb_core::{
16	error::diagnostic::subsystem::{address_unavailable, bind_failed},
17	interface::version::{ComponentType, HasVersion, SystemVersion},
18};
19use reifydb_runtime::SharedRuntime;
20use reifydb_sub_api::subsystem::{HealthStatus, Subsystem};
21use reifydb_type::error;
22use tokio::{net::TcpListener, sync::oneshot};
23
24use crate::state::AdminState;
25
26/// Admin server subsystem.
27///
28/// Manages an Axum-based admin HTTP server with support for:
29/// - Graceful startup and shutdown
30/// - Health monitoring
31pub struct AdminSubsystem {
32	/// Address to bind the server to.
33	bind_addr: String,
34	/// Actual bound address (available after start).
35	actual_addr: RwLock<Option<SocketAddr>>,
36	/// Shared application state.
37	state: AdminState,
38	/// Flag indicating if the server is running.
39	running: Arc<AtomicBool>,
40	/// Channel to send shutdown signal.
41	shutdown_tx: Option<oneshot::Sender<()>>,
42	/// Channel to receive shutdown completion.
43	shutdown_complete_rx: Option<oneshot::Receiver<()>>,
44	/// Shared tokio runtime.
45	runtime: SharedRuntime,
46}
47
48impl AdminSubsystem {
49	/// Create a new admin subsystem.
50	///
51	/// # Arguments
52	///
53	/// * `bind_addr` - Address and port to bind to (e.g., "127.0.0.1:9090")
54	/// * `state` - Shared application state
55	/// * `runtime` - Shared runtime
56	pub fn new(bind_addr: String, state: AdminState, runtime: SharedRuntime) -> Self {
57		Self {
58			bind_addr,
59			actual_addr: RwLock::new(None),
60			state,
61			running: Arc::new(AtomicBool::new(false)),
62			shutdown_tx: None,
63			shutdown_complete_rx: None,
64			runtime,
65		}
66	}
67
68	/// Get the bind address.
69	pub fn bind_addr(&self) -> &str {
70		&self.bind_addr
71	}
72
73	/// Get the actual bound address (available after start).
74	pub fn local_addr(&self) -> Option<SocketAddr> {
75		*self.actual_addr.read().unwrap()
76	}
77
78	/// Get the actual bound port (available after start).
79	pub fn port(&self) -> Option<u16> {
80		self.local_addr().map(|a| a.port())
81	}
82}
83
84impl HasVersion for AdminSubsystem {
85	fn version(&self) -> SystemVersion {
86		SystemVersion {
87			name: env!("CARGO_PKG_NAME")
88				.strip_prefix("reifydb-")
89				.unwrap_or(env!("CARGO_PKG_NAME"))
90				.to_string(),
91			version: env!("CARGO_PKG_VERSION").to_string(),
92			description: "Admin server subsystem for web-based administration".to_string(),
93			r#type: ComponentType::Subsystem,
94		}
95	}
96}
97
98impl Subsystem for AdminSubsystem {
99	fn name(&self) -> &'static str {
100		"Admin"
101	}
102
103	fn start(&mut self) -> reifydb_type::Result<()> {
104		// Idempotent: if already running, return success
105		if self.running.load(Ordering::SeqCst) {
106			return Ok(());
107		}
108
109		let addr = self.bind_addr.clone();
110		let runtime = self.runtime.clone();
111		let listener = runtime.block_on(TcpListener::bind(&addr)).map_err(|e| error!(bind_failed(&addr, e)))?;
112
113		let actual_addr = listener.local_addr().map_err(|e| error!(address_unavailable(e)))?;
114		*self.actual_addr.write().unwrap() = Some(actual_addr);
115		tracing::info!("Admin server bound to {}", actual_addr);
116
117		let (shutdown_tx, shutdown_rx) = oneshot::channel();
118		let (complete_tx, complete_rx) = oneshot::channel();
119
120		let state = self.state.clone();
121		let running = self.running.clone();
122		let runtime = self.runtime.clone();
123
124		runtime.spawn(async move {
125			// Mark as running
126			running.store(true, Ordering::SeqCst);
127
128			// Create router and serve
129			let app = crate::routes::router(state);
130			let server = axum::serve(listener, app).with_graceful_shutdown(async {
131				shutdown_rx.await.ok();
132				tracing::info!("Admin server received shutdown signal");
133			});
134
135			// Run until shutdown
136			if let Err(e) = server.await {
137				tracing::error!("Admin server error: {}", e);
138			}
139
140			// Mark as stopped
141			running.store(false, Ordering::SeqCst);
142			let _ = complete_tx.send(());
143			tracing::info!("Admin server stopped");
144		});
145
146		self.shutdown_tx = Some(shutdown_tx);
147		self.shutdown_complete_rx = Some(complete_rx);
148		Ok(())
149	}
150
151	fn shutdown(&mut self) -> reifydb_type::Result<()> {
152		if let Some(tx) = self.shutdown_tx.take() {
153			let _ = tx.send(());
154		}
155		if let Some(rx) = self.shutdown_complete_rx.take() {
156			let _ = self.runtime.block_on(rx);
157		}
158		Ok(())
159	}
160
161	fn is_running(&self) -> bool {
162		self.running.load(Ordering::SeqCst)
163	}
164
165	fn health_status(&self) -> HealthStatus {
166		if self.running.load(Ordering::SeqCst) {
167			HealthStatus::Healthy
168		} else if self.shutdown_tx.is_some() {
169			// Started but not yet running (startup in progress)
170			HealthStatus::Warning {
171				description: "Starting up".to_string(),
172			}
173		} else {
174			HealthStatus::Failed {
175				description: "Not running".to_string(),
176			}
177		}
178	}
179
180	fn as_any(&self) -> &dyn Any {
181		self
182	}
183
184	fn as_any_mut(&mut self) -> &mut dyn Any {
185		self
186	}
187}