1use crate::config::{ConfigResult, ServerConfig};
11use crate::health::{HealthChecker, HealthStatus};
12use crate::metrics::MetricsCollector;
13use crate::service::NetworkService;
14use crate::shutdown::ShutdownCoordinator;
15#[cfg(feature = "cluster")]
16use amaters_cluster::{NodeState, RaftConfig, RaftNode};
17use amaters_core::error::Result as CoreResult;
18use amaters_core::storage::{
19 BlockCacheConfig, CompactionConfig, LsmTreeConfig, LsmTreeStorage, MemoryStorage,
20 MemtableConfig, SSTableConfig,
21};
22use amaters_core::traits::StorageEngine;
23use amaters_core::types::{CipherBlob, Key};
24use async_trait::async_trait;
25use std::fs;
26use std::path::Path;
27use std::sync::Arc;
28use std::sync::atomic::{AtomicUsize, Ordering};
29use std::time::Duration;
30use thiserror::Error;
31use tokio::time::sleep;
32use tracing::{error, info, warn};
33
34#[derive(Clone)]
36pub enum Storage {
37 Memory(MemoryStorage),
38 Lsm(LsmTreeStorage),
39}
40
41#[async_trait]
42impl StorageEngine for Storage {
43 async fn put(&self, key: &Key, value: &CipherBlob) -> CoreResult<()> {
44 match self {
45 Storage::Memory(s) => s.put(key, value).await,
46 Storage::Lsm(s) => s.put(key, value).await,
47 }
48 }
49
50 async fn get(&self, key: &Key) -> CoreResult<Option<CipherBlob>> {
51 match self {
52 Storage::Memory(s) => s.get(key).await,
53 Storage::Lsm(s) => s.get(key).await,
54 }
55 }
56
57 async fn atomic_update<F>(&self, key: &Key, f: F) -> CoreResult<()>
58 where
59 F: Fn(&CipherBlob) -> CoreResult<CipherBlob> + Send + Sync,
60 {
61 match self {
62 Storage::Memory(s) => s.atomic_update(key, f).await,
63 Storage::Lsm(s) => s.atomic_update(key, f).await,
64 }
65 }
66
67 async fn delete(&self, key: &Key) -> CoreResult<()> {
68 match self {
69 Storage::Memory(s) => s.delete(key).await,
70 Storage::Lsm(s) => s.delete(key).await,
71 }
72 }
73
74 async fn range(&self, start: &Key, end: &Key) -> CoreResult<Vec<(Key, CipherBlob)>> {
75 match self {
76 Storage::Memory(s) => s.range(start, end).await,
77 Storage::Lsm(s) => s.range(start, end).await,
78 }
79 }
80
81 async fn keys(&self) -> CoreResult<Vec<Key>> {
82 match self {
83 Storage::Memory(s) => s.keys().await,
84 Storage::Lsm(s) => s.keys().await,
85 }
86 }
87
88 async fn flush(&self) -> CoreResult<()> {
89 match self {
90 Storage::Memory(s) => s.flush().await,
91 Storage::Lsm(s) => s.flush().await,
92 }
93 }
94
95 async fn close(&self) -> CoreResult<()> {
96 match self {
97 Storage::Memory(s) => s.close().await,
98 Storage::Lsm(s) => s.close().await,
99 }
100 }
101}
102
103#[derive(Error, Debug)]
105pub enum ServerError {
106 #[error("Configuration error: {0}")]
107 Config(String),
108
109 #[error("Configuration validation error: {0}")]
110 ConfigValidation(String),
111
112 #[error("Storage initialization failed: {0}")]
113 Storage(String),
114
115 #[error("Network initialization failed: {0}")]
116 Network(String),
117
118 #[error("Cluster initialization failed: {0}")]
119 Cluster(String),
120
121 #[error("TLS setup failed: {0}")]
122 TlsSetup(String),
123
124 #[error("Server already running")]
125 AlreadyRunning,
126
127 #[error("Failed to create directory: {0}")]
128 DirectoryCreation(#[from] std::io::Error),
129
130 #[error("Shutdown timeout")]
131 ShutdownTimeout,
132
133 #[error("Resource exhausted: {0}")]
134 ResourceExhausted(String),
135
136 #[error("Migration error: {0}")]
137 Migration(String),
138
139 #[error("Core error: {0}")]
140 Core(#[from] amaters_core::error::AmateRSError),
141}
142
143pub type ServerResult<T> = Result<T, ServerError>;
144
145#[derive(Debug)]
147pub struct QueryGuard {
148 counter: Arc<AtomicUsize>,
149}
150
151impl Drop for QueryGuard {
152 fn drop(&mut self) {
153 self.counter.fetch_sub(1, Ordering::AcqRel);
154 }
155}
156
157impl crate::retry::ErrorClassification for ServerError {
165 fn is_transient(&self) -> bool {
166 match self {
167 ServerError::DirectoryCreation(io_err) => {
168 matches!(
170 io_err.kind(),
171 std::io::ErrorKind::TimedOut
172 | std::io::ErrorKind::Interrupted
173 | std::io::ErrorKind::WouldBlock
174 | std::io::ErrorKind::ConnectionReset
175 | std::io::ErrorKind::ConnectionAborted
176 )
177 }
178 ServerError::Config(_)
181 | ServerError::ConfigValidation(_)
182 | ServerError::Storage(_)
183 | ServerError::Network(_)
184 | ServerError::Cluster(_)
185 | ServerError::TlsSetup(_)
186 | ServerError::AlreadyRunning
187 | ServerError::ShutdownTimeout
188 | ServerError::ResourceExhausted(_)
189 | ServerError::Migration(_)
190 | ServerError::Core(_) => false,
191 }
192 }
193}
194
195#[cfg(feature = "cluster")]
197#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
198pub struct ClusterStatus {
199 pub node_id: u64,
201 pub state: String,
203 pub current_term: u64,
205 pub leader_id: Option<u64>,
207 pub is_leader: bool,
209 pub commit_index: u64,
211 pub last_log_index: u64,
213}
214
215pub struct Server {
217 config: Arc<ServerConfig>,
219 storage: Option<Arc<Storage>>,
221 network: Option<NetworkService>,
223 #[cfg(feature = "cluster")]
225 cluster_node: Option<Arc<RaftNode>>,
226 shutdown: ShutdownCoordinator,
228 health: HealthChecker,
230 metrics: MetricsCollector,
232 active_queries: Arc<AtomicUsize>,
234}
235
236impl Server {
237 pub fn new(config: ServerConfig) -> Self {
239 Self {
240 config: Arc::new(config),
241 storage: None,
242 network: None,
243 #[cfg(feature = "cluster")]
244 cluster_node: None,
245 shutdown: ShutdownCoordinator::new(),
246 health: HealthChecker::new(),
247 metrics: MetricsCollector::new(),
248 active_queries: Arc::new(AtomicUsize::new(0)),
249 }
250 }
251
252 pub async fn initialize(&mut self) -> ServerResult<()> {
254 info!("Initializing server components");
255
256 self.ensure_data_directory()?;
258
259 self.initialize_storage().await?;
261
262 self.initialize_network().await?;
264
265 #[cfg(feature = "cluster")]
267 self.initialize_cluster()?;
268
269 self.health.set_status(HealthStatus::Starting);
271
272 info!("Server components initialized successfully");
273 Ok(())
274 }
275
276 fn ensure_data_directory(&self) -> ServerResult<()> {
278 let data_dir = &self.config.server.data_dir;
279 if !data_dir.exists() {
280 info!("Creating data directory: {}", data_dir.display());
281 fs::create_dir_all(data_dir)?;
282 }
283 Ok(())
284 }
285
286 async fn initialize_storage(&mut self) -> ServerResult<()> {
288 info!(
289 "Initializing storage engine: {}",
290 self.config.storage.engine
291 );
292
293 let storage = match self.config.storage.engine.as_str() {
294 "memory" => {
295 info!("Using in-memory storage engine");
296 Storage::Memory(MemoryStorage::new())
297 }
298 "lsm" => {
299 info!("Using LSM-Tree storage engine");
300 let lsm_config = self.build_lsm_config()?;
301 let lsm_storage = LsmTreeStorage::with_config(lsm_config).map_err(|e| {
302 ServerError::Storage(format!("Failed to create LSM storage: {}", e))
303 })?;
304 Storage::Lsm(lsm_storage)
305 }
306 other => {
307 return Err(ServerError::Config(format!(
308 "Invalid storage engine: {}. Supported: memory, lsm",
309 other
310 )));
311 }
312 };
313
314 self.storage = Some(Arc::new(storage));
315 self.health.set_storage_healthy(true);
316
317 info!("Storage engine initialized successfully");
318 Ok(())
319 }
320
321 fn build_lsm_config(&self) -> ServerResult<LsmTreeConfig> {
323 let data_dir = self.config.server.data_dir.join("lsm");
324 let wal_dir = self
325 .config
326 .server
327 .data_dir
328 .join(self.config.storage.wal.dir.clone());
329
330 std::fs::create_dir_all(&data_dir).map_err(|e| {
332 ServerError::Storage(format!("Failed to create LSM data directory: {}", e))
333 })?;
334 std::fs::create_dir_all(&wal_dir)
335 .map_err(|e| ServerError::Storage(format!("Failed to create WAL directory: {}", e)))?;
336
337 let memtable_config = MemtableConfig {
338 max_size_bytes: self.config.storage.memtable_size_mb * 1024 * 1024,
339 enable_wal: self.config.storage.wal.enabled,
340 };
341
342 let sstable_config = SSTableConfig {
343 block_size: 4096,
344 compression_type: amaters_core::storage::CompressionType::Lz4,
345 };
346
347 let block_cache_config = BlockCacheConfig {
348 max_size_bytes: self.config.storage.block_cache_size_mb * 1024 * 1024,
349 enable_stats: true,
350 };
351
352 let compaction_config = CompactionConfig {
353 strategy: match self.config.storage.compaction.strategy.as_str() {
354 "tiered" => amaters_core::storage::CompactionStrategy::SizeTiered,
355 _ => amaters_core::storage::CompactionStrategy::LevelBased,
356 },
357 l0_threshold: 4,
358 level_multiplier: self.config.storage.compaction.level_multiplier,
359 base_level_size: 10 * 1024 * 1024, max_compaction_bytes: 100 * 1024 * 1024, ..Default::default()
362 };
363
364 let value_log_config = None; Ok(LsmTreeConfig {
368 data_dir,
369 wal_dir,
370 memtable_config,
371 sstable_config,
372 block_cache_config,
373 compaction_config,
374 value_log_config,
375 max_levels: self.config.storage.compaction.num_levels,
376 l0_compaction_threshold: 4,
377 level_size_multiplier: self.config.storage.compaction.level_multiplier,
378 })
379 }
380
381 async fn initialize_network(&mut self) -> ServerResult<()> {
383 info!("Initializing network service");
384
385 let storage = self
386 .storage
387 .as_ref()
388 .ok_or_else(|| ServerError::Config("Storage not initialized".to_string()))?
389 .clone();
390
391 let network = NetworkService::new(
392 storage,
393 self.config.clone(),
394 self.health.clone(),
395 self.metrics.clone(),
396 self.shutdown.clone(),
397 );
398
399 self.network = Some(network);
400 self.health.set_network_healthy(true);
401
402 info!("Network service initialized successfully");
403 Ok(())
404 }
405
406 #[cfg(feature = "cluster")]
408 fn initialize_cluster(&mut self) -> ServerResult<()> {
409 let cluster_settings = match self.config.cluster.as_ref() {
410 Some(settings) if settings.enabled => settings,
411 _ => {
412 info!("Cluster mode disabled, running as standalone node");
413 return Ok(());
414 }
415 };
416
417 info!(
418 "Initializing cluster node (node_id: {}, peers: {:?})",
419 cluster_settings.node_id, cluster_settings.peers
420 );
421
422 let mut peer_ids: Vec<u64> = Vec::new();
425 for peer_str in &cluster_settings.peers {
426 let parts: Vec<&str> = peer_str.splitn(2, ':').collect();
427 if parts.is_empty() {
428 return Err(ServerError::Cluster(format!(
429 "Invalid peer format '{}', expected 'node_id:address'",
430 peer_str
431 )));
432 }
433 let peer_id: u64 = parts[0].parse().map_err(|e| {
434 ServerError::Cluster(format!("Invalid peer node_id in '{}': {}", peer_str, e))
435 })?;
436 peer_ids.push(peer_id);
437 }
438
439 if !peer_ids.contains(&cluster_settings.node_id) {
441 peer_ids.push(cluster_settings.node_id);
442 }
443
444 let mut raft_config = RaftConfig::new(cluster_settings.node_id, peer_ids);
446 raft_config.election_timeout_range = (
447 cluster_settings
448 .election_timeout_ms
449 .saturating_sub(cluster_settings.election_timeout_ms / 3),
450 cluster_settings
451 .election_timeout_ms
452 .saturating_add(cluster_settings.election_timeout_ms / 3),
453 );
454 raft_config.heartbeat_interval = cluster_settings.heartbeat_interval_ms;
455
456 let node = RaftNode::new(raft_config)
457 .map_err(|e| ServerError::Cluster(format!("Failed to create RaftNode: {}", e)))?;
458
459 self.cluster_node = Some(Arc::new(node));
460 self.health.set_cluster_enabled(true);
461 self.health.set_cluster_healthy(true);
462
463 info!(
464 "Cluster node initialized (node_id: {}, state: {})",
465 cluster_settings.node_id, "Follower"
466 );
467
468 Ok(())
469 }
470
471 #[cfg(feature = "cluster")]
473 pub fn cluster_node(&self) -> Option<&Arc<RaftNode>> {
474 self.cluster_node.as_ref()
475 }
476
477 #[cfg(feature = "cluster")]
479 pub fn cluster_status(&self) -> Option<ClusterStatus> {
480 self.cluster_node.as_ref().map(|node| ClusterStatus {
481 node_id: node.node_id(),
482 state: node.state().as_str().to_string(),
483 current_term: node.current_term(),
484 leader_id: node.leader_id(),
485 is_leader: node.is_leader(),
486 commit_index: node.commit_index(),
487 last_log_index: node.last_log_index(),
488 })
489 }
490
491 pub async fn start(&mut self) -> ServerResult<()> {
493 info!("Starting AmateRS server v{}", env!("CARGO_PKG_VERSION"));
494 info!("Bind address: {}", self.config.server.bind_address);
495 info!("Data directory: {}", self.config.server.data_dir.display());
496
497 #[cfg(feature = "cluster")]
499 if let Some(ref node) = self.cluster_node {
500 info!(
501 "Cluster mode: enabled (node_id: {}, state: {}, term: {})",
502 node.node_id(),
503 node.state().as_str(),
504 node.current_term()
505 );
506 } else {
507 info!("Cluster mode: disabled (standalone)");
508 }
509
510 if let Some(ref mut network) = self.network {
512 network.start().await?;
513 }
514
515 self.health.set_status(HealthStatus::Healthy);
517 self.health.set_network_healthy(true);
518
519 info!("Server started successfully");
520 info!("Press Ctrl+C to shutdown");
521
522 let mut shutdown_rx = self.shutdown.subscribe();
524 shutdown_rx
525 .recv()
526 .await
527 .map_err(|e| ServerError::Network(format!("Shutdown channel error: {}", e)))?;
528
529 info!("Shutdown signal received");
530 Ok(())
531 }
532
533 pub async fn shutdown(&mut self) -> ServerResult<()> {
535 info!("Shutting down server gracefully");
536 self.health.set_status(HealthStatus::ShuttingDown);
537
538 let shutdown_timeout = self.config.shutdown_timeout();
539
540 match tokio::time::timeout(shutdown_timeout, self.shutdown_internal()).await {
542 Ok(result) => result,
543 Err(_) => {
544 error!("Shutdown timeout exceeded");
545 Err(ServerError::ShutdownTimeout)
546 }
547 }
548 }
549
550 async fn shutdown_internal(&mut self) -> ServerResult<()> {
552 info!("Stopping new connections");
554 self.health.set_network_healthy(false);
555
556 if let Some(ref mut network) = self.network {
558 network.stop().await?;
559 }
560
561 let max_wait = Duration::from_secs(5);
563 let start = std::time::Instant::now();
564 while self.metrics.snapshot().active_connections > 0 && start.elapsed() < max_wait {
565 info!(
566 "Waiting for {} active connections to drain",
567 self.metrics.snapshot().active_connections
568 );
569 sleep(Duration::from_millis(100)).await;
570 }
571
572 if let Some(ref storage) = self.storage {
574 info!("Flushing storage");
575 storage
576 .flush()
577 .await
578 .map_err(|e| ServerError::Storage(format!("Failed to flush storage: {}", e)))?;
579 }
580
581 if let Some(ref storage) = self.storage {
583 info!("Closing storage");
584 storage
585 .close()
586 .await
587 .map_err(|e| ServerError::Storage(format!("Failed to close storage: {}", e)))?;
588 }
589
590 self.health.set_storage_healthy(false);
591
592 info!("Server shutdown complete");
593 Ok(())
594 }
595
596 pub fn shutdown_coordinator(&self) -> &ShutdownCoordinator {
598 &self.shutdown
599 }
600
601 pub fn health_checker(&self) -> &HealthChecker {
603 &self.health
604 }
605
606 pub fn metrics_collector(&self) -> &MetricsCollector {
608 &self.metrics
609 }
610
611 pub fn config(&self) -> &ServerConfig {
613 &self.config
614 }
615
616 pub fn is_running(config: &ServerConfig) -> bool {
618 let pid_file = &config.server.pid_file;
619 if !pid_file.exists() {
620 return false;
621 }
622
623 if let Ok(contents) = fs::read_to_string(pid_file) {
625 if let Ok(pid) = contents.trim().parse::<i32>() {
626 #[cfg(unix)]
628 {
629 use std::process::Command;
630 let output = Command::new("kill").arg("-0").arg(pid.to_string()).output();
631 if let Ok(output) = output {
632 return output.status.success();
633 }
634 }
635 #[cfg(not(unix))]
636 {
637 return true;
639 }
640 }
641 }
642
643 false
644 }
645
646 pub fn write_pid_file(config: &ServerConfig) -> ServerResult<()> {
648 let pid = std::process::id();
649 let pid_file = &config.server.pid_file;
650
651 if let Some(parent) = pid_file.parent() {
653 fs::create_dir_all(parent)?;
654 }
655
656 fs::write(pid_file, pid.to_string())?;
657 info!("PID file written: {} (pid: {})", pid_file.display(), pid);
658 Ok(())
659 }
660
661 pub fn remove_pid_file(config: &ServerConfig) -> ServerResult<()> {
663 let pid_file = &config.server.pid_file;
664 if pid_file.exists() {
665 fs::remove_file(pid_file)?;
666 info!("PID file removed: {}", pid_file.display());
667 }
668 Ok(())
669 }
670
671 pub fn try_acquire_query(&self) -> Result<QueryGuard, ServerError> {
674 let limit = self.config.resource_limits.max_active_queries;
675 let current = self.active_queries.fetch_add(1, Ordering::AcqRel);
676 if current >= limit {
677 self.active_queries.fetch_sub(1, Ordering::AcqRel);
678 Err(ServerError::ResourceExhausted(format!(
679 "Active query limit ({}) exceeded",
680 limit
681 )))
682 } else {
683 Ok(QueryGuard {
684 counter: Arc::clone(&self.active_queries),
685 })
686 }
687 }
688
689 pub fn active_query_count(&self) -> usize {
691 self.active_queries.load(Ordering::Acquire)
692 }
693
694 #[cfg(unix)]
696 pub fn stop_server(config: &ServerConfig, force: bool) -> ServerResult<()> {
697 let pid_file = &config.server.pid_file;
698
699 if !pid_file.exists() {
700 warn!("PID file not found - server may not be running");
701 return Ok(());
702 }
703
704 let contents = fs::read_to_string(pid_file)?;
705 let pid = contents
706 .trim()
707 .parse::<i32>()
708 .map_err(|e| ServerError::Config(format!("Invalid PID in file: {}", e)))?;
709
710 let signal = if force { "SIGKILL" } else { "SIGTERM" };
711 info!("Sending {} to process {}", signal, pid);
712
713 use std::process::Command;
714 let signal_arg = if force { "-9" } else { "-15" };
715
716 let output = Command::new("kill")
717 .arg(signal_arg)
718 .arg(pid.to_string())
719 .output()?;
720
721 if !output.status.success() {
722 let stderr = String::from_utf8_lossy(&output.stderr);
723 return Err(ServerError::Network(format!(
724 "Failed to stop server: {}",
725 stderr
726 )));
727 }
728
729 info!("Stop signal sent successfully");
730 Ok(())
731 }
732
733 #[cfg(not(unix))]
734 pub fn stop_server(_config: &ServerConfig, _force: bool) -> ServerResult<()> {
735 Err(ServerError::Config(
736 "Stop command is not supported on this platform".to_string(),
737 ))
738 }
739}
740
741#[cfg(test)]
742mod tests {
743 use super::*;
744 use std::env;
745
746 #[tokio::test]
747 async fn test_server_creation() {
748 let config = ServerConfig::default();
749 let server = Server::new(config);
750
751 assert_eq!(server.health_checker().status(), HealthStatus::Starting);
752 }
753
754 #[tokio::test]
755 async fn test_server_initialization() {
756 let mut config = ServerConfig::default();
757 config.server.data_dir = env::temp_dir().join("amaters_test_init");
758 config.storage.engine = "memory".to_string();
759
760 let mut server = Server::new(config);
761 let result = server.initialize().await;
762
763 assert!(result.is_ok());
764 assert!(server.storage.is_some());
765
766 if server.config.server.data_dir.exists() {
768 fs::remove_dir_all(&server.config.server.data_dir).ok();
769 }
770 }
771
772 #[tokio::test]
773 async fn test_lsm_initialization() {
774 let mut config = ServerConfig::default();
775 config.server.data_dir = env::temp_dir().join("amaters_test_lsm");
776 config.storage.engine = "lsm".to_string();
777
778 let mut server = Server::new(config);
779 let result = server.initialize().await;
780
781 assert!(result.is_ok());
782 assert!(server.storage.is_some());
783
784 if server.config.server.data_dir.exists() {
786 fs::remove_dir_all(&server.config.server.data_dir).ok();
787 }
788 }
789
790 #[tokio::test]
791 async fn test_data_directory_creation() {
792 let mut config = ServerConfig::default();
793 config.server.data_dir = env::temp_dir().join("amaters_test_dir");
794
795 if config.server.data_dir.exists() {
797 fs::remove_dir_all(&config.server.data_dir).ok();
798 }
799
800 let mut server = Server::new(config.clone());
801 server
802 .ensure_data_directory()
803 .expect("Failed to create directory");
804
805 assert!(config.server.data_dir.exists());
806
807 fs::remove_dir_all(&config.server.data_dir).ok();
809 }
810
811 #[tokio::test]
812 async fn test_shutdown_coordinator() {
813 let config = ServerConfig::default();
814 let server = Server::new(config);
815
816 let coordinator = server.shutdown_coordinator();
817 assert!(!coordinator.is_shutting_down());
818
819 coordinator.shutdown();
820 assert!(coordinator.is_shutting_down());
821 }
822
823 #[tokio::test]
825 async fn test_server_creation_without_cluster() {
826 let config = ServerConfig::default();
827 assert!(config.cluster.is_none());
828
829 let server = Server::new(config);
830 #[cfg(feature = "cluster")]
831 assert!(server.cluster_node.is_none());
832 assert_eq!(server.health_checker().status(), HealthStatus::Starting);
833 }
834
835 #[cfg(feature = "cluster")]
837 #[tokio::test]
838 async fn test_server_creation_with_cluster_config() {
839 use crate::config::ClusterSettings;
840
841 let mut config = ServerConfig::default();
842 config.server.data_dir = env::temp_dir().join("amaters_test_cluster");
843 config.storage.engine = "memory".to_string();
844 config.cluster = Some(ClusterSettings {
845 enabled: true,
846 node_id: 1,
847 peers: vec![
848 "1:127.0.0.1:7879".to_string(),
849 "2:127.0.0.1:7880".to_string(),
850 "3:127.0.0.1:7881".to_string(),
851 ],
852 heartbeat_interval_ms: 50,
853 election_timeout_ms: 300,
854 });
855
856 let mut server = Server::new(config);
857 let result = server.initialize().await;
858 assert!(result.is_ok());
859
860 assert!(server.cluster_node.is_some());
862
863 let status = server.cluster_status();
865 assert!(status.is_some());
866 let status = status.expect("cluster status should exist");
867 assert_eq!(status.node_id, 1);
868 assert_eq!(status.state, "Follower");
869 assert_eq!(status.current_term, 0);
870 assert!(!status.is_leader);
871
872 let health = server.health_checker().get_health();
874 let cluster_component = health
875 .components
876 .iter()
877 .find(|c| c.name == "cluster")
878 .expect("cluster component should exist");
879 assert_eq!(cluster_component.status, HealthStatus::Healthy);
880
881 if server.config.server.data_dir.exists() {
883 fs::remove_dir_all(&server.config.server.data_dir).ok();
884 }
885 }
886
887 #[test]
889 fn test_cluster_config_defaults() {
890 let config = ServerConfig::default();
891 assert!(config.cluster.is_none());
893 }
894
895 #[test]
897 fn test_cluster_config_validation() {
898 use crate::config::ClusterSettings;
899
900 let config = ServerConfig {
901 cluster: Some(ClusterSettings {
902 enabled: true,
903 node_id: 1,
904 peers: Vec::new(), heartbeat_interval_ms: 50,
906 election_timeout_ms: 300,
907 }),
908 ..Default::default()
909 };
910
911 let result = config.validate();
912 assert!(result.is_err());
913 }
914
915 #[cfg(feature = "cluster")]
917 #[tokio::test]
918 async fn test_server_cluster_disabled_explicitly() {
919 use crate::config::ClusterSettings;
920
921 let mut config = ServerConfig::default();
922 config.server.data_dir = env::temp_dir().join("amaters_test_cluster_disabled");
923 config.storage.engine = "memory".to_string();
924 config.cluster = Some(ClusterSettings {
925 enabled: false,
926 node_id: 1,
927 peers: Vec::new(),
928 heartbeat_interval_ms: 50,
929 election_timeout_ms: 300,
930 });
931
932 let mut server = Server::new(config);
933 let result = server.initialize().await;
934 assert!(result.is_ok());
935
936 assert!(server.cluster_node.is_none());
938 assert!(server.cluster_status().is_none());
939
940 if server.config.server.data_dir.exists() {
942 fs::remove_dir_all(&server.config.server.data_dir).ok();
943 }
944 }
945
946 #[tokio::test]
947 async fn test_max_active_queries_enforced() {
948 let mut config = ServerConfig::default();
949 config.resource_limits.max_active_queries = 2;
950 let server = Server::new(config);
951
952 let _guard1 = server.try_acquire_query().expect("Should acquire query 1");
954 let _guard2 = server.try_acquire_query().expect("Should acquire query 2");
955
956 let result = server.try_acquire_query();
958 assert!(result.is_err());
959 match result {
960 Err(ServerError::ResourceExhausted(_)) => {}
961 other => panic!("Expected ResourceExhausted, got {:?}", other),
962 }
963
964 assert_eq!(server.active_query_count(), 2);
965 }
966
967 #[tokio::test]
968 async fn test_query_guard_decrements_on_drop() {
969 let mut config = ServerConfig::default();
970 config.resource_limits.max_active_queries = 5;
971 let server = Server::new(config);
972
973 {
974 let _guard = server.try_acquire_query().expect("Should acquire");
975 assert_eq!(server.active_query_count(), 1);
976 } assert_eq!(server.active_query_count(), 0);
979 }
980
981 #[tokio::test]
982 async fn test_per_client_connection_limit() {
983 let config = ServerConfig::default();
985 assert_eq!(config.resource_limits.max_connections_per_client, 10);
986 assert_eq!(config.resource_limits.max_active_queries, 1000);
987 }
988}