Skip to main content

reifydb_sub_server_admin/
subsystem.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4//! Admin server subsystem implementing the ReifyDB Subsystem trait.
5
6use std::{
7	any::Any,
8	net::SocketAddr,
9	sync::{
10		Arc, RwLock,
11		atomic::{AtomicBool, Ordering},
12	},
13};
14
15use axum::serve;
16use reifydb_core::{
17	error::CoreError,
18	interface::version::{ComponentType, HasVersion, SystemVersion},
19};
20use reifydb_runtime::SharedRuntime;
21use reifydb_sub_api::subsystem::{HealthStatus, Subsystem};
22use reifydb_type::{Result, error::Error};
23use tokio::{net::TcpListener, sync::oneshot};
24use tracing::{error, info};
25
26use crate::{routes::router, state::AdminState};
27
28/// Admin server subsystem.
29///
30/// Manages an Axum-based admin HTTP server with support for:
31/// - Graceful startup and shutdown
32/// - Health monitoring
33pub struct AdminSubsystem {
34	/// Address to bind the server to.
35	bind_addr: String,
36	/// Actual bound address (available after start).
37	actual_addr: RwLock<Option<SocketAddr>>,
38	/// Shared application state.
39	state: AdminState,
40	/// Flag indicating if the server is running.
41	running: Arc<AtomicBool>,
42	/// Channel to send shutdown signal.
43	shutdown_tx: Option<oneshot::Sender<()>>,
44	/// Channel to receive shutdown completion.
45	shutdown_complete_rx: Option<oneshot::Receiver<()>>,
46	/// Shared tokio runtime.
47	runtime: SharedRuntime,
48}
49
50impl AdminSubsystem {
51	/// Create a new admin subsystem.
52	///
53	/// # Arguments
54	///
55	/// * `bind_addr` - Address and port to bind to (e.g., "127.0.0.1:9090")
56	/// * `state` - Shared application state
57	/// * `runtime` - Shared runtime
58	pub fn new(bind_addr: String, state: AdminState, runtime: SharedRuntime) -> Self {
59		Self {
60			bind_addr,
61			actual_addr: RwLock::new(None),
62			state,
63			running: Arc::new(AtomicBool::new(false)),
64			shutdown_tx: None,
65			shutdown_complete_rx: None,
66			runtime,
67		}
68	}
69
70	/// Get the bind address.
71	pub fn bind_addr(&self) -> &str {
72		&self.bind_addr
73	}
74
75	/// Get the actual bound address (available after start).
76	pub fn local_addr(&self) -> Option<SocketAddr> {
77		*self.actual_addr.read().unwrap()
78	}
79
80	/// Get the actual bound port (available after start).
81	pub fn port(&self) -> Option<u16> {
82		self.local_addr().map(|a| a.port())
83	}
84}
85
86impl HasVersion for AdminSubsystem {
87	fn version(&self) -> SystemVersion {
88		SystemVersion {
89			name: env!("CARGO_PKG_NAME")
90				.strip_prefix("reifydb-")
91				.unwrap_or(env!("CARGO_PKG_NAME"))
92				.to_string(),
93			version: env!("CARGO_PKG_VERSION").to_string(),
94			description: "Admin server subsystem for web-based administration".to_string(),
95			r#type: ComponentType::Subsystem,
96		}
97	}
98}
99
100impl Subsystem for AdminSubsystem {
101	fn name(&self) -> &'static str {
102		"Admin"
103	}
104
105	fn start(&mut self) -> Result<()> {
106		// Idempotent: if already running, return success
107		if self.running.load(Ordering::SeqCst) {
108			return Ok(());
109		}
110
111		let addr = self.bind_addr.clone();
112		let runtime = self.runtime.clone();
113		let listener = runtime.block_on(TcpListener::bind(&addr)).map_err(|e| {
114			let err: Error = CoreError::SubsystemBindFailed {
115				addr: addr.clone(),
116				reason: e.to_string(),
117			}
118			.into();
119			err
120		})?;
121
122		let actual_addr = listener.local_addr().map_err(|e| {
123			let err: Error = CoreError::SubsystemAddressUnavailable {
124				reason: e.to_string(),
125			}
126			.into();
127			err
128		})?;
129		*self.actual_addr.write().unwrap() = Some(actual_addr);
130		info!("Admin server bound to {}", actual_addr);
131
132		let (shutdown_tx, shutdown_rx) = oneshot::channel();
133		let (complete_tx, complete_rx) = oneshot::channel();
134
135		let state = self.state.clone();
136		let running = self.running.clone();
137		let runtime = self.runtime.clone();
138
139		runtime.spawn(async move {
140			// Mark as running
141			running.store(true, Ordering::SeqCst);
142
143			// Create router and serve
144			let app = router(state);
145			let server = serve(listener, app).with_graceful_shutdown(async {
146				shutdown_rx.await.ok();
147				info!("Admin server received shutdown signal");
148			});
149
150			// Run until shutdown
151			if let Err(e) = server.await {
152				error!("Admin server error: {}", e);
153			}
154
155			// Mark as stopped
156			running.store(false, Ordering::SeqCst);
157			let _ = complete_tx.send(());
158			info!("Admin server stopped");
159		});
160
161		self.shutdown_tx = Some(shutdown_tx);
162		self.shutdown_complete_rx = Some(complete_rx);
163		Ok(())
164	}
165
166	fn shutdown(&mut self) -> Result<()> {
167		if let Some(tx) = self.shutdown_tx.take() {
168			let _ = tx.send(());
169		}
170		if let Some(rx) = self.shutdown_complete_rx.take() {
171			let _ = self.runtime.block_on(rx);
172		}
173		Ok(())
174	}
175
176	fn is_running(&self) -> bool {
177		self.running.load(Ordering::SeqCst)
178	}
179
180	fn health_status(&self) -> HealthStatus {
181		if self.running.load(Ordering::SeqCst) {
182			HealthStatus::Healthy
183		} else if self.shutdown_tx.is_some() {
184			// Started but not yet running (startup in progress)
185			HealthStatus::Warning {
186				description: "Starting up".to_string(),
187			}
188		} else {
189			HealthStatus::Failed {
190				description: "Not running".to_string(),
191			}
192		}
193	}
194
195	fn as_any(&self) -> &dyn Any {
196		self
197	}
198
199	fn as_any_mut(&mut self) -> &mut dyn Any {
200		self
201	}
202}