use crate::config::{ConfigResult, ServerConfig};
use crate::health::{HealthChecker, HealthStatus};
use crate::metrics::MetricsCollector;
use crate::service::NetworkService;
use crate::shutdown::ShutdownCoordinator;
#[cfg(feature = "cluster")]
use amaters_cluster::{NodeState, RaftConfig, RaftNode};
use amaters_core::error::Result as CoreResult;
use amaters_core::storage::{
BlockCacheConfig, CompactionConfig, LsmTreeConfig, LsmTreeStorage, MemoryStorage,
MemtableConfig, SSTableConfig,
};
use amaters_core::traits::StorageEngine;
use amaters_core::types::{CipherBlob, Key};
use async_trait::async_trait;
use std::fs;
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use thiserror::Error;
use tokio::time::sleep;
use tracing::{error, info, warn};
#[derive(Clone)]
pub enum Storage {
Memory(MemoryStorage),
Lsm(LsmTreeStorage),
}
#[async_trait]
impl StorageEngine for Storage {
async fn put(&self, key: &Key, value: &CipherBlob) -> CoreResult<()> {
match self {
Storage::Memory(s) => s.put(key, value).await,
Storage::Lsm(s) => s.put(key, value).await,
}
}
async fn get(&self, key: &Key) -> CoreResult<Option<CipherBlob>> {
match self {
Storage::Memory(s) => s.get(key).await,
Storage::Lsm(s) => s.get(key).await,
}
}
async fn atomic_update<F>(&self, key: &Key, f: F) -> CoreResult<()>
where
F: Fn(&CipherBlob) -> CoreResult<CipherBlob> + Send + Sync,
{
match self {
Storage::Memory(s) => s.atomic_update(key, f).await,
Storage::Lsm(s) => s.atomic_update(key, f).await,
}
}
async fn delete(&self, key: &Key) -> CoreResult<()> {
match self {
Storage::Memory(s) => s.delete(key).await,
Storage::Lsm(s) => s.delete(key).await,
}
}
async fn range(&self, start: &Key, end: &Key) -> CoreResult<Vec<(Key, CipherBlob)>> {
match self {
Storage::Memory(s) => s.range(start, end).await,
Storage::Lsm(s) => s.range(start, end).await,
}
}
async fn keys(&self) -> CoreResult<Vec<Key>> {
match self {
Storage::Memory(s) => s.keys().await,
Storage::Lsm(s) => s.keys().await,
}
}
async fn flush(&self) -> CoreResult<()> {
match self {
Storage::Memory(s) => s.flush().await,
Storage::Lsm(s) => s.flush().await,
}
}
async fn close(&self) -> CoreResult<()> {
match self {
Storage::Memory(s) => s.close().await,
Storage::Lsm(s) => s.close().await,
}
}
}
#[derive(Error, Debug)]
pub enum ServerError {
#[error("Configuration error: {0}")]
Config(String),
#[error("Configuration validation error: {0}")]
ConfigValidation(String),
#[error("Storage initialization failed: {0}")]
Storage(String),
#[error("Network initialization failed: {0}")]
Network(String),
#[error("Cluster initialization failed: {0}")]
Cluster(String),
#[error("TLS setup failed: {0}")]
TlsSetup(String),
#[error("Server already running")]
AlreadyRunning,
#[error("Failed to create directory: {0}")]
DirectoryCreation(#[from] std::io::Error),
#[error("Shutdown timeout")]
ShutdownTimeout,
#[error("Core error: {0}")]
Core(#[from] amaters_core::error::AmateRSError),
}
pub type ServerResult<T> = Result<T, ServerError>;
impl crate::retry::ErrorClassification for ServerError {
fn is_transient(&self) -> bool {
match self {
ServerError::DirectoryCreation(io_err) => {
matches!(
io_err.kind(),
std::io::ErrorKind::TimedOut
| std::io::ErrorKind::Interrupted
| std::io::ErrorKind::WouldBlock
| std::io::ErrorKind::ConnectionReset
| std::io::ErrorKind::ConnectionAborted
)
}
ServerError::Config(_)
| ServerError::ConfigValidation(_)
| ServerError::Storage(_)
| ServerError::Network(_)
| ServerError::Cluster(_)
| ServerError::TlsSetup(_)
| ServerError::AlreadyRunning
| ServerError::ShutdownTimeout
| ServerError::Core(_) => false,
}
}
}
#[cfg(feature = "cluster")]
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct ClusterStatus {
pub node_id: u64,
pub state: String,
pub current_term: u64,
pub leader_id: Option<u64>,
pub is_leader: bool,
pub commit_index: u64,
pub last_log_index: u64,
}
pub struct Server {
config: Arc<ServerConfig>,
storage: Option<Arc<Storage>>,
network: Option<NetworkService>,
#[cfg(feature = "cluster")]
cluster_node: Option<Arc<RaftNode>>,
shutdown: ShutdownCoordinator,
health: HealthChecker,
metrics: MetricsCollector,
}
impl Server {
pub fn new(config: ServerConfig) -> Self {
Self {
config: Arc::new(config),
storage: None,
network: None,
#[cfg(feature = "cluster")]
cluster_node: None,
shutdown: ShutdownCoordinator::new(),
health: HealthChecker::new(),
metrics: MetricsCollector::new(),
}
}
pub async fn initialize(&mut self) -> ServerResult<()> {
info!("Initializing server components");
self.ensure_data_directory()?;
self.initialize_storage().await?;
self.initialize_network().await?;
#[cfg(feature = "cluster")]
self.initialize_cluster()?;
self.health.set_status(HealthStatus::Starting);
info!("Server components initialized successfully");
Ok(())
}
fn ensure_data_directory(&self) -> ServerResult<()> {
let data_dir = &self.config.server.data_dir;
if !data_dir.exists() {
info!("Creating data directory: {}", data_dir.display());
fs::create_dir_all(data_dir)?;
}
Ok(())
}
async fn initialize_storage(&mut self) -> ServerResult<()> {
info!(
"Initializing storage engine: {}",
self.config.storage.engine
);
let storage = match self.config.storage.engine.as_str() {
"memory" => {
info!("Using in-memory storage engine");
Storage::Memory(MemoryStorage::new())
}
"lsm" => {
info!("Using LSM-Tree storage engine");
let lsm_config = self.build_lsm_config()?;
let lsm_storage = LsmTreeStorage::with_config(lsm_config).map_err(|e| {
ServerError::Storage(format!("Failed to create LSM storage: {}", e))
})?;
Storage::Lsm(lsm_storage)
}
other => {
return Err(ServerError::Config(format!(
"Invalid storage engine: {}. Supported: memory, lsm",
other
)));
}
};
self.storage = Some(Arc::new(storage));
self.health.set_storage_healthy(true);
info!("Storage engine initialized successfully");
Ok(())
}
fn build_lsm_config(&self) -> ServerResult<LsmTreeConfig> {
let data_dir = self.config.server.data_dir.join("lsm");
let wal_dir = self
.config
.server
.data_dir
.join(self.config.storage.wal.dir.clone());
std::fs::create_dir_all(&data_dir).map_err(|e| {
ServerError::Storage(format!("Failed to create LSM data directory: {}", e))
})?;
std::fs::create_dir_all(&wal_dir)
.map_err(|e| ServerError::Storage(format!("Failed to create WAL directory: {}", e)))?;
let memtable_config = MemtableConfig {
max_size_bytes: self.config.storage.memtable_size_mb * 1024 * 1024,
enable_wal: self.config.storage.wal.enabled,
};
let sstable_config = SSTableConfig {
block_size: 4096,
compression_type: amaters_core::storage::CompressionType::Lz4,
};
let block_cache_config = BlockCacheConfig {
max_size_bytes: self.config.storage.block_cache_size_mb * 1024 * 1024,
enable_stats: true,
};
let compaction_config = CompactionConfig {
strategy: match self.config.storage.compaction.strategy.as_str() {
"tiered" => amaters_core::storage::CompactionStrategy::SizeTiered,
_ => amaters_core::storage::CompactionStrategy::LevelBased,
},
l0_threshold: 4,
level_multiplier: self.config.storage.compaction.level_multiplier,
base_level_size: 10 * 1024 * 1024, max_compaction_bytes: 100 * 1024 * 1024, ..Default::default()
};
let value_log_config = None;
Ok(LsmTreeConfig {
data_dir,
wal_dir,
memtable_config,
sstable_config,
block_cache_config,
compaction_config,
value_log_config,
max_levels: self.config.storage.compaction.num_levels,
l0_compaction_threshold: 4,
level_size_multiplier: self.config.storage.compaction.level_multiplier,
})
}
async fn initialize_network(&mut self) -> ServerResult<()> {
info!("Initializing network service");
let storage = self
.storage
.as_ref()
.ok_or_else(|| ServerError::Config("Storage not initialized".to_string()))?
.clone();
let network = NetworkService::new(
storage,
self.config.clone(),
self.health.clone(),
self.metrics.clone(),
self.shutdown.clone(),
);
self.network = Some(network);
self.health.set_network_healthy(true);
info!("Network service initialized successfully");
Ok(())
}
#[cfg(feature = "cluster")]
fn initialize_cluster(&mut self) -> ServerResult<()> {
let cluster_settings = match self.config.cluster.as_ref() {
Some(settings) if settings.enabled => settings,
_ => {
info!("Cluster mode disabled, running as standalone node");
return Ok(());
}
};
info!(
"Initializing cluster node (node_id: {}, peers: {:?})",
cluster_settings.node_id, cluster_settings.peers
);
let mut peer_ids: Vec<u64> = Vec::new();
for peer_str in &cluster_settings.peers {
let parts: Vec<&str> = peer_str.splitn(2, ':').collect();
if parts.is_empty() {
return Err(ServerError::Cluster(format!(
"Invalid peer format '{}', expected 'node_id:address'",
peer_str
)));
}
let peer_id: u64 = parts[0].parse().map_err(|e| {
ServerError::Cluster(format!("Invalid peer node_id in '{}': {}", peer_str, e))
})?;
peer_ids.push(peer_id);
}
if !peer_ids.contains(&cluster_settings.node_id) {
peer_ids.push(cluster_settings.node_id);
}
let mut raft_config = RaftConfig::new(cluster_settings.node_id, peer_ids);
raft_config.election_timeout_range = (
cluster_settings
.election_timeout_ms
.saturating_sub(cluster_settings.election_timeout_ms / 3),
cluster_settings
.election_timeout_ms
.saturating_add(cluster_settings.election_timeout_ms / 3),
);
raft_config.heartbeat_interval = cluster_settings.heartbeat_interval_ms;
let node = RaftNode::new(raft_config)
.map_err(|e| ServerError::Cluster(format!("Failed to create RaftNode: {}", e)))?;
self.cluster_node = Some(Arc::new(node));
self.health.set_cluster_enabled(true);
self.health.set_cluster_healthy(true);
info!(
"Cluster node initialized (node_id: {}, state: {})",
cluster_settings.node_id, "Follower"
);
Ok(())
}
#[cfg(feature = "cluster")]
pub fn cluster_node(&self) -> Option<&Arc<RaftNode>> {
self.cluster_node.as_ref()
}
#[cfg(feature = "cluster")]
pub fn cluster_status(&self) -> Option<ClusterStatus> {
self.cluster_node.as_ref().map(|node| ClusterStatus {
node_id: node.node_id(),
state: node.state().as_str().to_string(),
current_term: node.current_term(),
leader_id: node.leader_id(),
is_leader: node.is_leader(),
commit_index: node.commit_index(),
last_log_index: node.last_log_index(),
})
}
pub async fn start(&mut self) -> ServerResult<()> {
info!("Starting AmateRS server v{}", env!("CARGO_PKG_VERSION"));
info!("Bind address: {}", self.config.server.bind_address);
info!("Data directory: {}", self.config.server.data_dir.display());
#[cfg(feature = "cluster")]
if let Some(ref node) = self.cluster_node {
info!(
"Cluster mode: enabled (node_id: {}, state: {}, term: {})",
node.node_id(),
node.state().as_str(),
node.current_term()
);
} else {
info!("Cluster mode: disabled (standalone)");
}
if let Some(ref mut network) = self.network {
network.start().await?;
}
self.health.set_status(HealthStatus::Healthy);
self.health.set_network_healthy(true);
info!("Server started successfully");
info!("Press Ctrl+C to shutdown");
let mut shutdown_rx = self.shutdown.subscribe();
shutdown_rx
.recv()
.await
.map_err(|e| ServerError::Network(format!("Shutdown channel error: {}", e)))?;
info!("Shutdown signal received");
Ok(())
}
pub async fn shutdown(&mut self) -> ServerResult<()> {
info!("Shutting down server gracefully");
self.health.set_status(HealthStatus::ShuttingDown);
let shutdown_timeout = self.config.shutdown_timeout();
match tokio::time::timeout(shutdown_timeout, self.shutdown_internal()).await {
Ok(result) => result,
Err(_) => {
error!("Shutdown timeout exceeded");
Err(ServerError::ShutdownTimeout)
}
}
}
async fn shutdown_internal(&mut self) -> ServerResult<()> {
info!("Stopping new connections");
self.health.set_network_healthy(false);
if let Some(ref mut network) = self.network {
network.stop().await?;
}
let max_wait = Duration::from_secs(5);
let start = std::time::Instant::now();
while self.metrics.snapshot().active_connections > 0 && start.elapsed() < max_wait {
info!(
"Waiting for {} active connections to drain",
self.metrics.snapshot().active_connections
);
sleep(Duration::from_millis(100)).await;
}
if let Some(ref storage) = self.storage {
info!("Flushing storage");
storage
.flush()
.await
.map_err(|e| ServerError::Storage(format!("Failed to flush storage: {}", e)))?;
}
if let Some(ref storage) = self.storage {
info!("Closing storage");
storage
.close()
.await
.map_err(|e| ServerError::Storage(format!("Failed to close storage: {}", e)))?;
}
self.health.set_storage_healthy(false);
info!("Server shutdown complete");
Ok(())
}
pub fn shutdown_coordinator(&self) -> &ShutdownCoordinator {
&self.shutdown
}
pub fn health_checker(&self) -> &HealthChecker {
&self.health
}
pub fn metrics_collector(&self) -> &MetricsCollector {
&self.metrics
}
pub fn config(&self) -> &ServerConfig {
&self.config
}
pub fn is_running(config: &ServerConfig) -> bool {
let pid_file = &config.server.pid_file;
if !pid_file.exists() {
return false;
}
if let Ok(contents) = fs::read_to_string(pid_file) {
if let Ok(pid) = contents.trim().parse::<i32>() {
#[cfg(unix)]
{
use std::process::Command;
let output = Command::new("kill").arg("-0").arg(pid.to_string()).output();
if let Ok(output) = output {
return output.status.success();
}
}
#[cfg(not(unix))]
{
return true;
}
}
}
false
}
pub fn write_pid_file(config: &ServerConfig) -> ServerResult<()> {
let pid = std::process::id();
let pid_file = &config.server.pid_file;
if let Some(parent) = pid_file.parent() {
fs::create_dir_all(parent)?;
}
fs::write(pid_file, pid.to_string())?;
info!("PID file written: {} (pid: {})", pid_file.display(), pid);
Ok(())
}
pub fn remove_pid_file(config: &ServerConfig) -> ServerResult<()> {
let pid_file = &config.server.pid_file;
if pid_file.exists() {
fs::remove_file(pid_file)?;
info!("PID file removed: {}", pid_file.display());
}
Ok(())
}
#[cfg(unix)]
pub fn stop_server(config: &ServerConfig, force: bool) -> ServerResult<()> {
let pid_file = &config.server.pid_file;
if !pid_file.exists() {
warn!("PID file not found - server may not be running");
return Ok(());
}
let contents = fs::read_to_string(pid_file)?;
let pid = contents
.trim()
.parse::<i32>()
.map_err(|e| ServerError::Config(format!("Invalid PID in file: {}", e)))?;
let signal = if force { "SIGKILL" } else { "SIGTERM" };
info!("Sending {} to process {}", signal, pid);
use std::process::Command;
let signal_arg = if force { "-9" } else { "-15" };
let output = Command::new("kill")
.arg(signal_arg)
.arg(pid.to_string())
.output()?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
return Err(ServerError::Network(format!(
"Failed to stop server: {}",
stderr
)));
}
info!("Stop signal sent successfully");
Ok(())
}
#[cfg(not(unix))]
pub fn stop_server(_config: &ServerConfig, _force: bool) -> ServerResult<()> {
Err(ServerError::Config(
"Stop command is not supported on this platform".to_string(),
))
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::env;
#[tokio::test]
async fn test_server_creation() {
let config = ServerConfig::default();
let server = Server::new(config);
assert_eq!(server.health_checker().status(), HealthStatus::Starting);
}
#[tokio::test]
async fn test_server_initialization() {
let mut config = ServerConfig::default();
config.server.data_dir = env::temp_dir().join("amaters_test_init");
config.storage.engine = "memory".to_string();
let mut server = Server::new(config);
let result = server.initialize().await;
assert!(result.is_ok());
assert!(server.storage.is_some());
if server.config.server.data_dir.exists() {
fs::remove_dir_all(&server.config.server.data_dir).ok();
}
}
#[tokio::test]
async fn test_lsm_initialization() {
let mut config = ServerConfig::default();
config.server.data_dir = env::temp_dir().join("amaters_test_lsm");
config.storage.engine = "lsm".to_string();
let mut server = Server::new(config);
let result = server.initialize().await;
assert!(result.is_ok());
assert!(server.storage.is_some());
if server.config.server.data_dir.exists() {
fs::remove_dir_all(&server.config.server.data_dir).ok();
}
}
#[tokio::test]
async fn test_data_directory_creation() {
let mut config = ServerConfig::default();
config.server.data_dir = env::temp_dir().join("amaters_test_dir");
if config.server.data_dir.exists() {
fs::remove_dir_all(&config.server.data_dir).ok();
}
let mut server = Server::new(config.clone());
server
.ensure_data_directory()
.expect("Failed to create directory");
assert!(config.server.data_dir.exists());
fs::remove_dir_all(&config.server.data_dir).ok();
}
#[tokio::test]
async fn test_shutdown_coordinator() {
let config = ServerConfig::default();
let server = Server::new(config);
let coordinator = server.shutdown_coordinator();
assert!(!coordinator.is_shutting_down());
coordinator.shutdown();
assert!(coordinator.is_shutting_down());
}
#[tokio::test]
async fn test_server_creation_without_cluster() {
let config = ServerConfig::default();
assert!(config.cluster.is_none());
let server = Server::new(config);
#[cfg(feature = "cluster")]
assert!(server.cluster_node.is_none());
assert_eq!(server.health_checker().status(), HealthStatus::Starting);
}
#[cfg(feature = "cluster")]
#[tokio::test]
async fn test_server_creation_with_cluster_config() {
use crate::config::ClusterSettings;
let mut config = ServerConfig::default();
config.server.data_dir = env::temp_dir().join("amaters_test_cluster");
config.storage.engine = "memory".to_string();
config.cluster = Some(ClusterSettings {
enabled: true,
node_id: 1,
peers: vec![
"1:127.0.0.1:7879".to_string(),
"2:127.0.0.1:7880".to_string(),
"3:127.0.0.1:7881".to_string(),
],
heartbeat_interval_ms: 50,
election_timeout_ms: 300,
});
let mut server = Server::new(config);
let result = server.initialize().await;
assert!(result.is_ok());
assert!(server.cluster_node.is_some());
let status = server.cluster_status();
assert!(status.is_some());
let status = status.expect("cluster status should exist");
assert_eq!(status.node_id, 1);
assert_eq!(status.state, "Follower");
assert_eq!(status.current_term, 0);
assert!(!status.is_leader);
let health = server.health_checker().get_health();
let cluster_component = health
.components
.iter()
.find(|c| c.name == "cluster")
.expect("cluster component should exist");
assert_eq!(cluster_component.status, HealthStatus::Healthy);
if server.config.server.data_dir.exists() {
fs::remove_dir_all(&server.config.server.data_dir).ok();
}
}
#[test]
fn test_cluster_config_defaults() {
let config = ServerConfig::default();
assert!(config.cluster.is_none());
}
#[test]
fn test_cluster_config_validation() {
use crate::config::ClusterSettings;
let config = ServerConfig {
cluster: Some(ClusterSettings {
enabled: true,
node_id: 1,
peers: Vec::new(), heartbeat_interval_ms: 50,
election_timeout_ms: 300,
}),
..Default::default()
};
let result = config.validate();
assert!(result.is_err());
}
#[cfg(feature = "cluster")]
#[tokio::test]
async fn test_server_cluster_disabled_explicitly() {
use crate::config::ClusterSettings;
let mut config = ServerConfig::default();
config.server.data_dir = env::temp_dir().join("amaters_test_cluster_disabled");
config.storage.engine = "memory".to_string();
config.cluster = Some(ClusterSettings {
enabled: false,
node_id: 1,
peers: Vec::new(),
heartbeat_interval_ms: 50,
election_timeout_ms: 300,
});
let mut server = Server::new(config);
let result = server.initialize().await;
assert!(result.is_ok());
assert!(server.cluster_node.is_none());
assert!(server.cluster_status().is_none());
if server.config.server.data_dir.exists() {
fs::remove_dir_all(&server.config.server.data_dir).ok();
}
}
}