Skip to main content

amaters_server/
server.rs

1//! Server runtime module
2//!
3//! This module integrates all server components:
4//! - Storage engine (amaters-core)
5//! - Network layer (amaters-net)
6//! - Consensus (amaters-cluster)
7//! - Health checking
8//! - Metrics collection
9
10use crate::config::{ConfigResult, ServerConfig};
11use crate::health::{HealthChecker, HealthStatus};
12use crate::metrics::MetricsCollector;
13use crate::service::NetworkService;
14use crate::shutdown::ShutdownCoordinator;
15#[cfg(feature = "cluster")]
16use amaters_cluster::{NodeState, RaftConfig, RaftNode};
17use amaters_core::error::Result as CoreResult;
18use amaters_core::storage::{
19    BlockCacheConfig, CompactionConfig, LsmTreeConfig, LsmTreeStorage, MemoryStorage,
20    MemtableConfig, SSTableConfig,
21};
22use amaters_core::traits::StorageEngine;
23use amaters_core::types::{CipherBlob, Key};
24use async_trait::async_trait;
25use std::fs;
26use std::path::Path;
27use std::sync::Arc;
28use std::time::Duration;
29use thiserror::Error;
30use tokio::time::sleep;
31use tracing::{error, info, warn};
32
33/// Storage engine wrapper enum to support multiple storage types
34#[derive(Clone)]
35pub enum Storage {
36    Memory(MemoryStorage),
37    Lsm(LsmTreeStorage),
38}
39
40#[async_trait]
41impl StorageEngine for Storage {
42    async fn put(&self, key: &Key, value: &CipherBlob) -> CoreResult<()> {
43        match self {
44            Storage::Memory(s) => s.put(key, value).await,
45            Storage::Lsm(s) => s.put(key, value).await,
46        }
47    }
48
49    async fn get(&self, key: &Key) -> CoreResult<Option<CipherBlob>> {
50        match self {
51            Storage::Memory(s) => s.get(key).await,
52            Storage::Lsm(s) => s.get(key).await,
53        }
54    }
55
56    async fn atomic_update<F>(&self, key: &Key, f: F) -> CoreResult<()>
57    where
58        F: Fn(&CipherBlob) -> CoreResult<CipherBlob> + Send + Sync,
59    {
60        match self {
61            Storage::Memory(s) => s.atomic_update(key, f).await,
62            Storage::Lsm(s) => s.atomic_update(key, f).await,
63        }
64    }
65
66    async fn delete(&self, key: &Key) -> CoreResult<()> {
67        match self {
68            Storage::Memory(s) => s.delete(key).await,
69            Storage::Lsm(s) => s.delete(key).await,
70        }
71    }
72
73    async fn range(&self, start: &Key, end: &Key) -> CoreResult<Vec<(Key, CipherBlob)>> {
74        match self {
75            Storage::Memory(s) => s.range(start, end).await,
76            Storage::Lsm(s) => s.range(start, end).await,
77        }
78    }
79
80    async fn keys(&self) -> CoreResult<Vec<Key>> {
81        match self {
82            Storage::Memory(s) => s.keys().await,
83            Storage::Lsm(s) => s.keys().await,
84        }
85    }
86
87    async fn flush(&self) -> CoreResult<()> {
88        match self {
89            Storage::Memory(s) => s.flush().await,
90            Storage::Lsm(s) => s.flush().await,
91        }
92    }
93
94    async fn close(&self) -> CoreResult<()> {
95        match self {
96            Storage::Memory(s) => s.close().await,
97            Storage::Lsm(s) => s.close().await,
98        }
99    }
100}
101
102/// Server errors
103#[derive(Error, Debug)]
104pub enum ServerError {
105    #[error("Configuration error: {0}")]
106    Config(String),
107
108    #[error("Configuration validation error: {0}")]
109    ConfigValidation(String),
110
111    #[error("Storage initialization failed: {0}")]
112    Storage(String),
113
114    #[error("Network initialization failed: {0}")]
115    Network(String),
116
117    #[error("Cluster initialization failed: {0}")]
118    Cluster(String),
119
120    #[error("TLS setup failed: {0}")]
121    TlsSetup(String),
122
123    #[error("Server already running")]
124    AlreadyRunning,
125
126    #[error("Failed to create directory: {0}")]
127    DirectoryCreation(#[from] std::io::Error),
128
129    #[error("Shutdown timeout")]
130    ShutdownTimeout,
131
132    #[error("Core error: {0}")]
133    Core(#[from] amaters_core::error::AmateRSError),
134}
135
136pub type ServerResult<T> = Result<T, ServerError>;
137
138/// Classify [`ServerError`] variants as transient (retriable) or permanent.
139///
140/// This implementation is deliberately conservative: only
141/// [`ServerError::DirectoryCreation`] with obviously transient I/O kinds is
142/// classified as transient.  String-typed variants (`Storage`, `Network`,
143/// `Cluster`) are kept permanent because their underlying cause cannot be
144/// determined without parsing the message.
145impl crate::retry::ErrorClassification for ServerError {
146    fn is_transient(&self) -> bool {
147        match self {
148            ServerError::DirectoryCreation(io_err) => {
149                // Only a small set of I/O error kinds can recover on retry.
150                matches!(
151                    io_err.kind(),
152                    std::io::ErrorKind::TimedOut
153                        | std::io::ErrorKind::Interrupted
154                        | std::io::ErrorKind::WouldBlock
155                        | std::io::ErrorKind::ConnectionReset
156                        | std::io::ErrorKind::ConnectionAborted
157                )
158            }
159            // Everything else is permanent: config errors, validation errors,
160            // shutdown timeouts, auth failures, etc.
161            ServerError::Config(_)
162            | ServerError::ConfigValidation(_)
163            | ServerError::Storage(_)
164            | ServerError::Network(_)
165            | ServerError::Cluster(_)
166            | ServerError::TlsSetup(_)
167            | ServerError::AlreadyRunning
168            | ServerError::ShutdownTimeout
169            | ServerError::Core(_) => false,
170        }
171    }
172}
173
174/// Cluster status information for monitoring
175#[cfg(feature = "cluster")]
176#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
177pub struct ClusterStatus {
178    /// This node's ID
179    pub node_id: u64,
180    /// Current Raft state (Follower, Candidate, Leader)
181    pub state: String,
182    /// Current Raft term
183    pub current_term: u64,
184    /// Current leader ID (if known)
185    pub leader_id: Option<u64>,
186    /// Whether this node is the leader
187    pub is_leader: bool,
188    /// Commit index
189    pub commit_index: u64,
190    /// Last log index
191    pub last_log_index: u64,
192}
193
194/// Main server runtime
195pub struct Server {
196    /// Server configuration
197    config: Arc<ServerConfig>,
198    /// Storage engine (supports memory or LSM)
199    storage: Option<Arc<Storage>>,
200    /// Network service (AQL API)
201    network: Option<NetworkService>,
202    /// Raft consensus node (when cluster feature is enabled)
203    #[cfg(feature = "cluster")]
204    cluster_node: Option<Arc<RaftNode>>,
205    /// Shutdown coordinator
206    shutdown: ShutdownCoordinator,
207    /// Health checker
208    health: HealthChecker,
209    /// Metrics collector
210    metrics: MetricsCollector,
211}
212
213impl Server {
214    /// Create a new server with the given configuration
215    pub fn new(config: ServerConfig) -> Self {
216        Self {
217            config: Arc::new(config),
218            storage: None,
219            network: None,
220            #[cfg(feature = "cluster")]
221            cluster_node: None,
222            shutdown: ShutdownCoordinator::new(),
223            health: HealthChecker::new(),
224            metrics: MetricsCollector::new(),
225        }
226    }
227
228    /// Initialize server components
229    pub async fn initialize(&mut self) -> ServerResult<()> {
230        info!("Initializing server components");
231
232        // Create data directory if it doesn't exist
233        self.ensure_data_directory()?;
234
235        // Initialize storage
236        self.initialize_storage().await?;
237
238        // Initialize network service
239        self.initialize_network().await?;
240
241        // Initialize cluster (if enabled)
242        #[cfg(feature = "cluster")]
243        self.initialize_cluster()?;
244
245        // Initialize health checker
246        self.health.set_status(HealthStatus::Starting);
247
248        info!("Server components initialized successfully");
249        Ok(())
250    }
251
252    /// Ensure data directory exists
253    fn ensure_data_directory(&self) -> ServerResult<()> {
254        let data_dir = &self.config.server.data_dir;
255        if !data_dir.exists() {
256            info!("Creating data directory: {}", data_dir.display());
257            fs::create_dir_all(data_dir)?;
258        }
259        Ok(())
260    }
261
262    /// Initialize storage engine
263    async fn initialize_storage(&mut self) -> ServerResult<()> {
264        info!(
265            "Initializing storage engine: {}",
266            self.config.storage.engine
267        );
268
269        let storage = match self.config.storage.engine.as_str() {
270            "memory" => {
271                info!("Using in-memory storage engine");
272                Storage::Memory(MemoryStorage::new())
273            }
274            "lsm" => {
275                info!("Using LSM-Tree storage engine");
276                let lsm_config = self.build_lsm_config()?;
277                let lsm_storage = LsmTreeStorage::with_config(lsm_config).map_err(|e| {
278                    ServerError::Storage(format!("Failed to create LSM storage: {}", e))
279                })?;
280                Storage::Lsm(lsm_storage)
281            }
282            other => {
283                return Err(ServerError::Config(format!(
284                    "Invalid storage engine: {}. Supported: memory, lsm",
285                    other
286                )));
287            }
288        };
289
290        self.storage = Some(Arc::new(storage));
291        self.health.set_storage_healthy(true);
292
293        info!("Storage engine initialized successfully");
294        Ok(())
295    }
296
297    /// Build LSM-Tree configuration from server config
298    fn build_lsm_config(&self) -> ServerResult<LsmTreeConfig> {
299        let data_dir = self.config.server.data_dir.join("lsm");
300        let wal_dir = self
301            .config
302            .server
303            .data_dir
304            .join(self.config.storage.wal.dir.clone());
305
306        // Create directories
307        std::fs::create_dir_all(&data_dir).map_err(|e| {
308            ServerError::Storage(format!("Failed to create LSM data directory: {}", e))
309        })?;
310        std::fs::create_dir_all(&wal_dir)
311            .map_err(|e| ServerError::Storage(format!("Failed to create WAL directory: {}", e)))?;
312
313        let memtable_config = MemtableConfig {
314            max_size_bytes: self.config.storage.memtable_size_mb * 1024 * 1024,
315            enable_wal: self.config.storage.wal.enabled,
316        };
317
318        let sstable_config = SSTableConfig {
319            block_size: 4096,
320            compression_type: amaters_core::storage::CompressionType::Lz4,
321        };
322
323        let block_cache_config = BlockCacheConfig {
324            max_size_bytes: self.config.storage.block_cache_size_mb * 1024 * 1024,
325            enable_stats: true,
326        };
327
328        let compaction_config = CompactionConfig {
329            strategy: match self.config.storage.compaction.strategy.as_str() {
330                "tiered" => amaters_core::storage::CompactionStrategy::SizeTiered,
331                _ => amaters_core::storage::CompactionStrategy::LevelBased,
332            },
333            l0_threshold: 4,
334            level_multiplier: self.config.storage.compaction.level_multiplier,
335            base_level_size: 10 * 1024 * 1024,       // 10 MB
336            max_compaction_bytes: 100 * 1024 * 1024, // 100 MB
337            ..Default::default()
338        };
339
340        // Optional value log configuration for large values
341        let value_log_config = None; // Disabled for now, can be enabled later
342
343        Ok(LsmTreeConfig {
344            data_dir,
345            wal_dir,
346            memtable_config,
347            sstable_config,
348            block_cache_config,
349            compaction_config,
350            value_log_config,
351            max_levels: self.config.storage.compaction.num_levels,
352            l0_compaction_threshold: 4,
353            level_size_multiplier: self.config.storage.compaction.level_multiplier,
354        })
355    }
356
357    /// Initialize network service
358    async fn initialize_network(&mut self) -> ServerResult<()> {
359        info!("Initializing network service");
360
361        let storage = self
362            .storage
363            .as_ref()
364            .ok_or_else(|| ServerError::Config("Storage not initialized".to_string()))?
365            .clone();
366
367        let network = NetworkService::new(
368            storage,
369            self.config.clone(),
370            self.health.clone(),
371            self.metrics.clone(),
372            self.shutdown.clone(),
373        );
374
375        self.network = Some(network);
376        self.health.set_network_healthy(true);
377
378        info!("Network service initialized successfully");
379        Ok(())
380    }
381
382    /// Initialize the Raft cluster node from server configuration
383    #[cfg(feature = "cluster")]
384    fn initialize_cluster(&mut self) -> ServerResult<()> {
385        let cluster_settings = match self.config.cluster.as_ref() {
386            Some(settings) if settings.enabled => settings,
387            _ => {
388                info!("Cluster mode disabled, running as standalone node");
389                return Ok(());
390            }
391        };
392
393        info!(
394            "Initializing cluster node (node_id: {}, peers: {:?})",
395            cluster_settings.node_id, cluster_settings.peers
396        );
397
398        // Parse peer addresses into node IDs
399        // Peers format: "node_id:address" (e.g., "1:127.0.0.1:7879")
400        let mut peer_ids: Vec<u64> = Vec::new();
401        for peer_str in &cluster_settings.peers {
402            let parts: Vec<&str> = peer_str.splitn(2, ':').collect();
403            if parts.is_empty() {
404                return Err(ServerError::Cluster(format!(
405                    "Invalid peer format '{}', expected 'node_id:address'",
406                    peer_str
407                )));
408            }
409            let peer_id: u64 = parts[0].parse().map_err(|e| {
410                ServerError::Cluster(format!("Invalid peer node_id in '{}': {}", peer_str, e))
411            })?;
412            peer_ids.push(peer_id);
413        }
414
415        // Ensure self is included in peers list
416        if !peer_ids.contains(&cluster_settings.node_id) {
417            peer_ids.push(cluster_settings.node_id);
418        }
419
420        // Build RaftConfig from ClusterSettings
421        let mut raft_config = RaftConfig::new(cluster_settings.node_id, peer_ids);
422        raft_config.election_timeout_range = (
423            cluster_settings
424                .election_timeout_ms
425                .saturating_sub(cluster_settings.election_timeout_ms / 3),
426            cluster_settings
427                .election_timeout_ms
428                .saturating_add(cluster_settings.election_timeout_ms / 3),
429        );
430        raft_config.heartbeat_interval = cluster_settings.heartbeat_interval_ms;
431
432        let node = RaftNode::new(raft_config)
433            .map_err(|e| ServerError::Cluster(format!("Failed to create RaftNode: {}", e)))?;
434
435        self.cluster_node = Some(Arc::new(node));
436        self.health.set_cluster_enabled(true);
437        self.health.set_cluster_healthy(true);
438
439        info!(
440            "Cluster node initialized (node_id: {}, state: {})",
441            cluster_settings.node_id, "Follower"
442        );
443
444        Ok(())
445    }
446
447    /// Get the cluster node (if cluster feature is enabled and cluster is active)
448    #[cfg(feature = "cluster")]
449    pub fn cluster_node(&self) -> Option<&Arc<RaftNode>> {
450        self.cluster_node.as_ref()
451    }
452
453    /// Get cluster status information for health/monitoring
454    #[cfg(feature = "cluster")]
455    pub fn cluster_status(&self) -> Option<ClusterStatus> {
456        self.cluster_node.as_ref().map(|node| ClusterStatus {
457            node_id: node.node_id(),
458            state: node.state().as_str().to_string(),
459            current_term: node.current_term(),
460            leader_id: node.leader_id(),
461            is_leader: node.is_leader(),
462            commit_index: node.commit_index(),
463            last_log_index: node.last_log_index(),
464        })
465    }
466
467    /// Start the server
468    pub async fn start(&mut self) -> ServerResult<()> {
469        info!("Starting AmateRS server v{}", env!("CARGO_PKG_VERSION"));
470        info!("Bind address: {}", self.config.server.bind_address);
471        info!("Data directory: {}", self.config.server.data_dir.display());
472
473        // Log cluster status
474        #[cfg(feature = "cluster")]
475        if let Some(ref node) = self.cluster_node {
476            info!(
477                "Cluster mode: enabled (node_id: {}, state: {}, term: {})",
478                node.node_id(),
479                node.state().as_str(),
480                node.current_term()
481            );
482        } else {
483            info!("Cluster mode: disabled (standalone)");
484        }
485
486        // Start network service
487        if let Some(ref mut network) = self.network {
488            network.start().await?;
489        }
490
491        // Mark server as healthy
492        self.health.set_status(HealthStatus::Healthy);
493        self.health.set_network_healthy(true);
494
495        info!("Server started successfully");
496        info!("Press Ctrl+C to shutdown");
497
498        // Wait for shutdown signal
499        let mut shutdown_rx = self.shutdown.subscribe();
500        shutdown_rx
501            .recv()
502            .await
503            .map_err(|e| ServerError::Network(format!("Shutdown channel error: {}", e)))?;
504
505        info!("Shutdown signal received");
506        Ok(())
507    }
508
509    /// Gracefully shutdown the server
510    pub async fn shutdown(&mut self) -> ServerResult<()> {
511        info!("Shutting down server gracefully");
512        self.health.set_status(HealthStatus::ShuttingDown);
513
514        let shutdown_timeout = self.config.shutdown_timeout();
515
516        // Shutdown with timeout
517        match tokio::time::timeout(shutdown_timeout, self.shutdown_internal()).await {
518            Ok(result) => result,
519            Err(_) => {
520                error!("Shutdown timeout exceeded");
521                Err(ServerError::ShutdownTimeout)
522            }
523        }
524    }
525
526    /// Internal shutdown logic
527    async fn shutdown_internal(&mut self) -> ServerResult<()> {
528        // 1. Stop accepting new connections
529        info!("Stopping new connections");
530        self.health.set_network_healthy(false);
531
532        // 2. Stop network service
533        if let Some(ref mut network) = self.network {
534            network.stop().await?;
535        }
536
537        // 2. Wait for active connections to drain
538        let max_wait = Duration::from_secs(5);
539        let start = std::time::Instant::now();
540        while self.metrics.snapshot().active_connections > 0 && start.elapsed() < max_wait {
541            info!(
542                "Waiting for {} active connections to drain",
543                self.metrics.snapshot().active_connections
544            );
545            sleep(Duration::from_millis(100)).await;
546        }
547
548        // 3. Flush storage
549        if let Some(ref storage) = self.storage {
550            info!("Flushing storage");
551            storage
552                .flush()
553                .await
554                .map_err(|e| ServerError::Storage(format!("Failed to flush storage: {}", e)))?;
555        }
556
557        // 4. Close storage
558        if let Some(ref storage) = self.storage {
559            info!("Closing storage");
560            storage
561                .close()
562                .await
563                .map_err(|e| ServerError::Storage(format!("Failed to close storage: {}", e)))?;
564        }
565
566        self.health.set_storage_healthy(false);
567
568        info!("Server shutdown complete");
569        Ok(())
570    }
571
572    /// Get shutdown coordinator
573    pub fn shutdown_coordinator(&self) -> &ShutdownCoordinator {
574        &self.shutdown
575    }
576
577    /// Get health checker
578    pub fn health_checker(&self) -> &HealthChecker {
579        &self.health
580    }
581
582    /// Get metrics collector
583    pub fn metrics_collector(&self) -> &MetricsCollector {
584        &self.metrics
585    }
586
587    /// Get configuration
588    pub fn config(&self) -> &ServerConfig {
589        &self.config
590    }
591
592    /// Check if server is running (by checking PID file)
593    pub fn is_running(config: &ServerConfig) -> bool {
594        let pid_file = &config.server.pid_file;
595        if !pid_file.exists() {
596            return false;
597        }
598
599        // Read PID from file
600        if let Ok(contents) = fs::read_to_string(pid_file) {
601            if let Ok(pid) = contents.trim().parse::<i32>() {
602                // Check if process exists (Unix-specific)
603                #[cfg(unix)]
604                {
605                    use std::process::Command;
606                    let output = Command::new("kill").arg("-0").arg(pid.to_string()).output();
607                    if let Ok(output) = output {
608                        return output.status.success();
609                    }
610                }
611                #[cfg(not(unix))]
612                {
613                    // On non-Unix, assume running if PID file exists
614                    return true;
615                }
616            }
617        }
618
619        false
620    }
621
622    /// Write PID file
623    pub fn write_pid_file(config: &ServerConfig) -> ServerResult<()> {
624        let pid = std::process::id();
625        let pid_file = &config.server.pid_file;
626
627        // Create parent directory if needed
628        if let Some(parent) = pid_file.parent() {
629            fs::create_dir_all(parent)?;
630        }
631
632        fs::write(pid_file, pid.to_string())?;
633        info!("PID file written: {} (pid: {})", pid_file.display(), pid);
634        Ok(())
635    }
636
637    /// Remove PID file
638    pub fn remove_pid_file(config: &ServerConfig) -> ServerResult<()> {
639        let pid_file = &config.server.pid_file;
640        if pid_file.exists() {
641            fs::remove_file(pid_file)?;
642            info!("PID file removed: {}", pid_file.display());
643        }
644        Ok(())
645    }
646
647    /// Send stop signal to running server
648    #[cfg(unix)]
649    pub fn stop_server(config: &ServerConfig, force: bool) -> ServerResult<()> {
650        let pid_file = &config.server.pid_file;
651
652        if !pid_file.exists() {
653            warn!("PID file not found - server may not be running");
654            return Ok(());
655        }
656
657        let contents = fs::read_to_string(pid_file)?;
658        let pid = contents
659            .trim()
660            .parse::<i32>()
661            .map_err(|e| ServerError::Config(format!("Invalid PID in file: {}", e)))?;
662
663        let signal = if force { "SIGKILL" } else { "SIGTERM" };
664        info!("Sending {} to process {}", signal, pid);
665
666        use std::process::Command;
667        let signal_arg = if force { "-9" } else { "-15" };
668
669        let output = Command::new("kill")
670            .arg(signal_arg)
671            .arg(pid.to_string())
672            .output()?;
673
674        if !output.status.success() {
675            let stderr = String::from_utf8_lossy(&output.stderr);
676            return Err(ServerError::Network(format!(
677                "Failed to stop server: {}",
678                stderr
679            )));
680        }
681
682        info!("Stop signal sent successfully");
683        Ok(())
684    }
685
686    #[cfg(not(unix))]
687    pub fn stop_server(_config: &ServerConfig, _force: bool) -> ServerResult<()> {
688        Err(ServerError::Config(
689            "Stop command is not supported on this platform".to_string(),
690        ))
691    }
692}
693
694#[cfg(test)]
695mod tests {
696    use super::*;
697    use std::env;
698
699    #[tokio::test]
700    async fn test_server_creation() {
701        let config = ServerConfig::default();
702        let server = Server::new(config);
703
704        assert_eq!(server.health_checker().status(), HealthStatus::Starting);
705    }
706
707    #[tokio::test]
708    async fn test_server_initialization() {
709        let mut config = ServerConfig::default();
710        config.server.data_dir = env::temp_dir().join("amaters_test_init");
711        config.storage.engine = "memory".to_string();
712
713        let mut server = Server::new(config);
714        let result = server.initialize().await;
715
716        assert!(result.is_ok());
717        assert!(server.storage.is_some());
718
719        // Cleanup
720        if server.config.server.data_dir.exists() {
721            fs::remove_dir_all(&server.config.server.data_dir).ok();
722        }
723    }
724
725    #[tokio::test]
726    async fn test_lsm_initialization() {
727        let mut config = ServerConfig::default();
728        config.server.data_dir = env::temp_dir().join("amaters_test_lsm");
729        config.storage.engine = "lsm".to_string();
730
731        let mut server = Server::new(config);
732        let result = server.initialize().await;
733
734        assert!(result.is_ok());
735        assert!(server.storage.is_some());
736
737        // Cleanup
738        if server.config.server.data_dir.exists() {
739            fs::remove_dir_all(&server.config.server.data_dir).ok();
740        }
741    }
742
743    #[tokio::test]
744    async fn test_data_directory_creation() {
745        let mut config = ServerConfig::default();
746        config.server.data_dir = env::temp_dir().join("amaters_test_dir");
747
748        // Ensure directory doesn't exist
749        if config.server.data_dir.exists() {
750            fs::remove_dir_all(&config.server.data_dir).ok();
751        }
752
753        let mut server = Server::new(config.clone());
754        server
755            .ensure_data_directory()
756            .expect("Failed to create directory");
757
758        assert!(config.server.data_dir.exists());
759
760        // Cleanup
761        fs::remove_dir_all(&config.server.data_dir).ok();
762    }
763
764    #[tokio::test]
765    async fn test_shutdown_coordinator() {
766        let config = ServerConfig::default();
767        let server = Server::new(config);
768
769        let coordinator = server.shutdown_coordinator();
770        assert!(!coordinator.is_shutting_down());
771
772        coordinator.shutdown();
773        assert!(coordinator.is_shutting_down());
774    }
775
776    /// Test that server creation works without cluster config (default)
777    #[tokio::test]
778    async fn test_server_creation_without_cluster() {
779        let config = ServerConfig::default();
780        assert!(config.cluster.is_none());
781
782        let server = Server::new(config);
783        #[cfg(feature = "cluster")]
784        assert!(server.cluster_node.is_none());
785        assert_eq!(server.health_checker().status(), HealthStatus::Starting);
786    }
787
788    /// Test server initialization with cluster config enabled (3-node cluster)
789    #[cfg(feature = "cluster")]
790    #[tokio::test]
791    async fn test_server_creation_with_cluster_config() {
792        use crate::config::ClusterSettings;
793
794        let mut config = ServerConfig::default();
795        config.server.data_dir = env::temp_dir().join("amaters_test_cluster");
796        config.storage.engine = "memory".to_string();
797        config.cluster = Some(ClusterSettings {
798            enabled: true,
799            node_id: 1,
800            peers: vec![
801                "1:127.0.0.1:7879".to_string(),
802                "2:127.0.0.1:7880".to_string(),
803                "3:127.0.0.1:7881".to_string(),
804            ],
805            heartbeat_interval_ms: 50,
806            election_timeout_ms: 300,
807        });
808
809        let mut server = Server::new(config);
810        let result = server.initialize().await;
811        assert!(result.is_ok());
812
813        // Cluster node should be initialized
814        assert!(server.cluster_node.is_some());
815
816        // Cluster status should be available
817        let status = server.cluster_status();
818        assert!(status.is_some());
819        let status = status.expect("cluster status should exist");
820        assert_eq!(status.node_id, 1);
821        assert_eq!(status.state, "Follower");
822        assert_eq!(status.current_term, 0);
823        assert!(!status.is_leader);
824
825        // Health should reflect cluster is active
826        let health = server.health_checker().get_health();
827        let cluster_component = health
828            .components
829            .iter()
830            .find(|c| c.name == "cluster")
831            .expect("cluster component should exist");
832        assert_eq!(cluster_component.status, HealthStatus::Healthy);
833
834        // Cleanup
835        if server.config.server.data_dir.exists() {
836            fs::remove_dir_all(&server.config.server.data_dir).ok();
837        }
838    }
839
840    /// Test cluster config defaults and validation
841    #[test]
842    fn test_cluster_config_defaults() {
843        let config = ServerConfig::default();
844        // By default, cluster is None (disabled)
845        assert!(config.cluster.is_none());
846    }
847
848    /// Test cluster config validation: enabled but no peers should fail config validation
849    #[test]
850    fn test_cluster_config_validation() {
851        use crate::config::ClusterSettings;
852
853        let config = ServerConfig {
854            cluster: Some(ClusterSettings {
855                enabled: true,
856                node_id: 1,
857                peers: Vec::new(), // No peers - should fail validation
858                heartbeat_interval_ms: 50,
859                election_timeout_ms: 300,
860            }),
861            ..Default::default()
862        };
863
864        let result = config.validate();
865        assert!(result.is_err());
866    }
867
868    /// Test that server works with cluster disabled even when config is present
869    #[cfg(feature = "cluster")]
870    #[tokio::test]
871    async fn test_server_cluster_disabled_explicitly() {
872        use crate::config::ClusterSettings;
873
874        let mut config = ServerConfig::default();
875        config.server.data_dir = env::temp_dir().join("amaters_test_cluster_disabled");
876        config.storage.engine = "memory".to_string();
877        config.cluster = Some(ClusterSettings {
878            enabled: false,
879            node_id: 1,
880            peers: Vec::new(),
881            heartbeat_interval_ms: 50,
882            election_timeout_ms: 300,
883        });
884
885        let mut server = Server::new(config);
886        let result = server.initialize().await;
887        assert!(result.is_ok());
888
889        // Cluster node should NOT be initialized
890        assert!(server.cluster_node.is_none());
891        assert!(server.cluster_status().is_none());
892
893        // Cleanup
894        if server.config.server.data_dir.exists() {
895            fs::remove_dir_all(&server.config.server.data_dir).ok();
896        }
897    }
898}