Skip to main content

amaters_server/
server.rs

1//! Server runtime module
2//!
3//! This module integrates all server components:
4//! - Storage engine (amaters-core)
5//! - Network layer (amaters-net)
6//! - Consensus (amaters-cluster)
7//! - Health checking
8//! - Metrics collection
9
10use crate::config::{ConfigResult, ServerConfig};
11use crate::health::{HealthChecker, HealthStatus};
12use crate::metrics::MetricsCollector;
13use crate::service::NetworkService;
14use crate::shutdown::ShutdownCoordinator;
15#[cfg(feature = "cluster")]
16use amaters_cluster::{NodeState, RaftConfig, RaftNode};
17use amaters_core::error::Result as CoreResult;
18use amaters_core::storage::{
19    BlockCacheConfig, CompactionConfig, LsmTreeConfig, LsmTreeStorage, MemoryStorage,
20    MemtableConfig, SSTableConfig,
21};
22use amaters_core::traits::StorageEngine;
23use amaters_core::types::{CipherBlob, Key};
24use async_trait::async_trait;
25use std::fs;
26use std::path::Path;
27use std::sync::Arc;
28use std::sync::atomic::{AtomicUsize, Ordering};
29use std::time::Duration;
30use thiserror::Error;
31use tokio::time::sleep;
32use tracing::{error, info, warn};
33
34/// Storage engine wrapper enum to support multiple storage types
35#[derive(Clone)]
36pub enum Storage {
37    Memory(MemoryStorage),
38    Lsm(LsmTreeStorage),
39}
40
41#[async_trait]
42impl StorageEngine for Storage {
43    async fn put(&self, key: &Key, value: &CipherBlob) -> CoreResult<()> {
44        match self {
45            Storage::Memory(s) => s.put(key, value).await,
46            Storage::Lsm(s) => s.put(key, value).await,
47        }
48    }
49
50    async fn get(&self, key: &Key) -> CoreResult<Option<CipherBlob>> {
51        match self {
52            Storage::Memory(s) => s.get(key).await,
53            Storage::Lsm(s) => s.get(key).await,
54        }
55    }
56
57    async fn atomic_update<F>(&self, key: &Key, f: F) -> CoreResult<()>
58    where
59        F: Fn(&CipherBlob) -> CoreResult<CipherBlob> + Send + Sync,
60    {
61        match self {
62            Storage::Memory(s) => s.atomic_update(key, f).await,
63            Storage::Lsm(s) => s.atomic_update(key, f).await,
64        }
65    }
66
67    async fn delete(&self, key: &Key) -> CoreResult<()> {
68        match self {
69            Storage::Memory(s) => s.delete(key).await,
70            Storage::Lsm(s) => s.delete(key).await,
71        }
72    }
73
74    async fn range(&self, start: &Key, end: &Key) -> CoreResult<Vec<(Key, CipherBlob)>> {
75        match self {
76            Storage::Memory(s) => s.range(start, end).await,
77            Storage::Lsm(s) => s.range(start, end).await,
78        }
79    }
80
81    async fn keys(&self) -> CoreResult<Vec<Key>> {
82        match self {
83            Storage::Memory(s) => s.keys().await,
84            Storage::Lsm(s) => s.keys().await,
85        }
86    }
87
88    async fn flush(&self) -> CoreResult<()> {
89        match self {
90            Storage::Memory(s) => s.flush().await,
91            Storage::Lsm(s) => s.flush().await,
92        }
93    }
94
95    async fn close(&self) -> CoreResult<()> {
96        match self {
97            Storage::Memory(s) => s.close().await,
98            Storage::Lsm(s) => s.close().await,
99        }
100    }
101}
102
103/// Server errors
104#[derive(Error, Debug)]
105pub enum ServerError {
106    #[error("Configuration error: {0}")]
107    Config(String),
108
109    #[error("Configuration validation error: {0}")]
110    ConfigValidation(String),
111
112    #[error("Storage initialization failed: {0}")]
113    Storage(String),
114
115    #[error("Network initialization failed: {0}")]
116    Network(String),
117
118    #[error("Cluster initialization failed: {0}")]
119    Cluster(String),
120
121    #[error("TLS setup failed: {0}")]
122    TlsSetup(String),
123
124    #[error("Server already running")]
125    AlreadyRunning,
126
127    #[error("Failed to create directory: {0}")]
128    DirectoryCreation(#[from] std::io::Error),
129
130    #[error("Shutdown timeout")]
131    ShutdownTimeout,
132
133    #[error("Resource exhausted: {0}")]
134    ResourceExhausted(String),
135
136    #[error("Migration error: {0}")]
137    Migration(String),
138
139    #[error("Core error: {0}")]
140    Core(#[from] amaters_core::error::AmateRSError),
141}
142
143pub type ServerResult<T> = Result<T, ServerError>;
144
145/// RAII guard that decrements the active query counter on drop.
146#[derive(Debug)]
147pub struct QueryGuard {
148    counter: Arc<AtomicUsize>,
149}
150
151impl Drop for QueryGuard {
152    fn drop(&mut self) {
153        self.counter.fetch_sub(1, Ordering::AcqRel);
154    }
155}
156
157/// Classify [`ServerError`] variants as transient (retriable) or permanent.
158///
159/// This implementation is deliberately conservative: only
160/// [`ServerError::DirectoryCreation`] with obviously transient I/O kinds is
161/// classified as transient.  String-typed variants (`Storage`, `Network`,
162/// `Cluster`) are kept permanent because their underlying cause cannot be
163/// determined without parsing the message.
164impl crate::retry::ErrorClassification for ServerError {
165    fn is_transient(&self) -> bool {
166        match self {
167            ServerError::DirectoryCreation(io_err) => {
168                // Only a small set of I/O error kinds can recover on retry.
169                matches!(
170                    io_err.kind(),
171                    std::io::ErrorKind::TimedOut
172                        | std::io::ErrorKind::Interrupted
173                        | std::io::ErrorKind::WouldBlock
174                        | std::io::ErrorKind::ConnectionReset
175                        | std::io::ErrorKind::ConnectionAborted
176                )
177            }
178            // Everything else is permanent: config errors, validation errors,
179            // shutdown timeouts, auth failures, etc.
180            ServerError::Config(_)
181            | ServerError::ConfigValidation(_)
182            | ServerError::Storage(_)
183            | ServerError::Network(_)
184            | ServerError::Cluster(_)
185            | ServerError::TlsSetup(_)
186            | ServerError::AlreadyRunning
187            | ServerError::ShutdownTimeout
188            | ServerError::ResourceExhausted(_)
189            | ServerError::Migration(_)
190            | ServerError::Core(_) => false,
191        }
192    }
193}
194
195/// Cluster status information for monitoring
196#[cfg(feature = "cluster")]
197#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
198pub struct ClusterStatus {
199    /// This node's ID
200    pub node_id: u64,
201    /// Current Raft state (Follower, Candidate, Leader)
202    pub state: String,
203    /// Current Raft term
204    pub current_term: u64,
205    /// Current leader ID (if known)
206    pub leader_id: Option<u64>,
207    /// Whether this node is the leader
208    pub is_leader: bool,
209    /// Commit index
210    pub commit_index: u64,
211    /// Last log index
212    pub last_log_index: u64,
213}
214
215/// Main server runtime
216pub struct Server {
217    /// Server configuration
218    config: Arc<ServerConfig>,
219    /// Storage engine (supports memory or LSM)
220    storage: Option<Arc<Storage>>,
221    /// Network service (AQL API)
222    network: Option<NetworkService>,
223    /// Raft consensus node (when cluster feature is enabled)
224    #[cfg(feature = "cluster")]
225    cluster_node: Option<Arc<RaftNode>>,
226    /// Shutdown coordinator
227    shutdown: ShutdownCoordinator,
228    /// Health checker
229    health: HealthChecker,
230    /// Metrics collector
231    metrics: MetricsCollector,
232    /// Active query counter for resource limit enforcement
233    active_queries: Arc<AtomicUsize>,
234}
235
236impl Server {
237    /// Create a new server with the given configuration
238    pub fn new(config: ServerConfig) -> Self {
239        Self {
240            config: Arc::new(config),
241            storage: None,
242            network: None,
243            #[cfg(feature = "cluster")]
244            cluster_node: None,
245            shutdown: ShutdownCoordinator::new(),
246            health: HealthChecker::new(),
247            metrics: MetricsCollector::new(),
248            active_queries: Arc::new(AtomicUsize::new(0)),
249        }
250    }
251
252    /// Initialize server components
253    pub async fn initialize(&mut self) -> ServerResult<()> {
254        info!("Initializing server components");
255
256        // Create data directory if it doesn't exist
257        self.ensure_data_directory()?;
258
259        // Initialize storage
260        self.initialize_storage().await?;
261
262        // Initialize network service
263        self.initialize_network().await?;
264
265        // Initialize cluster (if enabled)
266        #[cfg(feature = "cluster")]
267        self.initialize_cluster()?;
268
269        // Initialize health checker
270        self.health.set_status(HealthStatus::Starting);
271
272        info!("Server components initialized successfully");
273        Ok(())
274    }
275
276    /// Ensure data directory exists
277    fn ensure_data_directory(&self) -> ServerResult<()> {
278        let data_dir = &self.config.server.data_dir;
279        if !data_dir.exists() {
280            info!("Creating data directory: {}", data_dir.display());
281            fs::create_dir_all(data_dir)?;
282        }
283        Ok(())
284    }
285
286    /// Initialize storage engine
287    async fn initialize_storage(&mut self) -> ServerResult<()> {
288        info!(
289            "Initializing storage engine: {}",
290            self.config.storage.engine
291        );
292
293        let storage = match self.config.storage.engine.as_str() {
294            "memory" => {
295                info!("Using in-memory storage engine");
296                Storage::Memory(MemoryStorage::new())
297            }
298            "lsm" => {
299                info!("Using LSM-Tree storage engine");
300                let lsm_config = self.build_lsm_config()?;
301                let lsm_storage = LsmTreeStorage::with_config(lsm_config).map_err(|e| {
302                    ServerError::Storage(format!("Failed to create LSM storage: {}", e))
303                })?;
304                Storage::Lsm(lsm_storage)
305            }
306            other => {
307                return Err(ServerError::Config(format!(
308                    "Invalid storage engine: {}. Supported: memory, lsm",
309                    other
310                )));
311            }
312        };
313
314        self.storage = Some(Arc::new(storage));
315        self.health.set_storage_healthy(true);
316
317        info!("Storage engine initialized successfully");
318        Ok(())
319    }
320
321    /// Build LSM-Tree configuration from server config
322    fn build_lsm_config(&self) -> ServerResult<LsmTreeConfig> {
323        let data_dir = self.config.server.data_dir.join("lsm");
324        let wal_dir = self
325            .config
326            .server
327            .data_dir
328            .join(self.config.storage.wal.dir.clone());
329
330        // Create directories
331        std::fs::create_dir_all(&data_dir).map_err(|e| {
332            ServerError::Storage(format!("Failed to create LSM data directory: {}", e))
333        })?;
334        std::fs::create_dir_all(&wal_dir)
335            .map_err(|e| ServerError::Storage(format!("Failed to create WAL directory: {}", e)))?;
336
337        let memtable_config = MemtableConfig {
338            max_size_bytes: self.config.storage.memtable_size_mb * 1024 * 1024,
339            enable_wal: self.config.storage.wal.enabled,
340        };
341
342        let sstable_config = SSTableConfig {
343            block_size: 4096,
344            compression_type: amaters_core::storage::CompressionType::Lz4,
345        };
346
347        let block_cache_config = BlockCacheConfig {
348            max_size_bytes: self.config.storage.block_cache_size_mb * 1024 * 1024,
349            enable_stats: true,
350        };
351
352        let compaction_config = CompactionConfig {
353            strategy: match self.config.storage.compaction.strategy.as_str() {
354                "tiered" => amaters_core::storage::CompactionStrategy::SizeTiered,
355                _ => amaters_core::storage::CompactionStrategy::LevelBased,
356            },
357            l0_threshold: 4,
358            level_multiplier: self.config.storage.compaction.level_multiplier,
359            base_level_size: 10 * 1024 * 1024,       // 10 MB
360            max_compaction_bytes: 100 * 1024 * 1024, // 100 MB
361            ..Default::default()
362        };
363
364        // Optional value log configuration for large values
365        let value_log_config = None; // Disabled for now, can be enabled later
366
367        Ok(LsmTreeConfig {
368            data_dir,
369            wal_dir,
370            memtable_config,
371            sstable_config,
372            block_cache_config,
373            compaction_config,
374            value_log_config,
375            max_levels: self.config.storage.compaction.num_levels,
376            l0_compaction_threshold: 4,
377            level_size_multiplier: self.config.storage.compaction.level_multiplier,
378        })
379    }
380
381    /// Initialize network service
382    async fn initialize_network(&mut self) -> ServerResult<()> {
383        info!("Initializing network service");
384
385        let storage = self
386            .storage
387            .as_ref()
388            .ok_or_else(|| ServerError::Config("Storage not initialized".to_string()))?
389            .clone();
390
391        let network = NetworkService::new(
392            storage,
393            self.config.clone(),
394            self.health.clone(),
395            self.metrics.clone(),
396            self.shutdown.clone(),
397        );
398
399        self.network = Some(network);
400        self.health.set_network_healthy(true);
401
402        info!("Network service initialized successfully");
403        Ok(())
404    }
405
406    /// Initialize the Raft cluster node from server configuration
407    #[cfg(feature = "cluster")]
408    fn initialize_cluster(&mut self) -> ServerResult<()> {
409        let cluster_settings = match self.config.cluster.as_ref() {
410            Some(settings) if settings.enabled => settings,
411            _ => {
412                info!("Cluster mode disabled, running as standalone node");
413                return Ok(());
414            }
415        };
416
417        info!(
418            "Initializing cluster node (node_id: {}, peers: {:?})",
419            cluster_settings.node_id, cluster_settings.peers
420        );
421
422        // Parse peer addresses into node IDs
423        // Peers format: "node_id:address" (e.g., "1:127.0.0.1:7879")
424        let mut peer_ids: Vec<u64> = Vec::new();
425        for peer_str in &cluster_settings.peers {
426            let parts: Vec<&str> = peer_str.splitn(2, ':').collect();
427            if parts.is_empty() {
428                return Err(ServerError::Cluster(format!(
429                    "Invalid peer format '{}', expected 'node_id:address'",
430                    peer_str
431                )));
432            }
433            let peer_id: u64 = parts[0].parse().map_err(|e| {
434                ServerError::Cluster(format!("Invalid peer node_id in '{}': {}", peer_str, e))
435            })?;
436            peer_ids.push(peer_id);
437        }
438
439        // Ensure self is included in peers list
440        if !peer_ids.contains(&cluster_settings.node_id) {
441            peer_ids.push(cluster_settings.node_id);
442        }
443
444        // Build RaftConfig from ClusterSettings
445        let mut raft_config = RaftConfig::new(cluster_settings.node_id, peer_ids);
446        raft_config.election_timeout_range = (
447            cluster_settings
448                .election_timeout_ms
449                .saturating_sub(cluster_settings.election_timeout_ms / 3),
450            cluster_settings
451                .election_timeout_ms
452                .saturating_add(cluster_settings.election_timeout_ms / 3),
453        );
454        raft_config.heartbeat_interval = cluster_settings.heartbeat_interval_ms;
455
456        let node = RaftNode::new(raft_config)
457            .map_err(|e| ServerError::Cluster(format!("Failed to create RaftNode: {}", e)))?;
458
459        self.cluster_node = Some(Arc::new(node));
460        self.health.set_cluster_enabled(true);
461        self.health.set_cluster_healthy(true);
462
463        info!(
464            "Cluster node initialized (node_id: {}, state: {})",
465            cluster_settings.node_id, "Follower"
466        );
467
468        Ok(())
469    }
470
471    /// Get the cluster node (if cluster feature is enabled and cluster is active)
472    #[cfg(feature = "cluster")]
473    pub fn cluster_node(&self) -> Option<&Arc<RaftNode>> {
474        self.cluster_node.as_ref()
475    }
476
477    /// Get cluster status information for health/monitoring
478    #[cfg(feature = "cluster")]
479    pub fn cluster_status(&self) -> Option<ClusterStatus> {
480        self.cluster_node.as_ref().map(|node| ClusterStatus {
481            node_id: node.node_id(),
482            state: node.state().as_str().to_string(),
483            current_term: node.current_term(),
484            leader_id: node.leader_id(),
485            is_leader: node.is_leader(),
486            commit_index: node.commit_index(),
487            last_log_index: node.last_log_index(),
488        })
489    }
490
491    /// Start the server
492    pub async fn start(&mut self) -> ServerResult<()> {
493        info!("Starting AmateRS server v{}", env!("CARGO_PKG_VERSION"));
494        info!("Bind address: {}", self.config.server.bind_address);
495        info!("Data directory: {}", self.config.server.data_dir.display());
496
497        // Log cluster status
498        #[cfg(feature = "cluster")]
499        if let Some(ref node) = self.cluster_node {
500            info!(
501                "Cluster mode: enabled (node_id: {}, state: {}, term: {})",
502                node.node_id(),
503                node.state().as_str(),
504                node.current_term()
505            );
506        } else {
507            info!("Cluster mode: disabled (standalone)");
508        }
509
510        // Start network service
511        if let Some(ref mut network) = self.network {
512            network.start().await?;
513        }
514
515        // Mark server as healthy
516        self.health.set_status(HealthStatus::Healthy);
517        self.health.set_network_healthy(true);
518
519        info!("Server started successfully");
520        info!("Press Ctrl+C to shutdown");
521
522        // Wait for shutdown signal
523        let mut shutdown_rx = self.shutdown.subscribe();
524        shutdown_rx
525            .recv()
526            .await
527            .map_err(|e| ServerError::Network(format!("Shutdown channel error: {}", e)))?;
528
529        info!("Shutdown signal received");
530        Ok(())
531    }
532
533    /// Gracefully shutdown the server
534    pub async fn shutdown(&mut self) -> ServerResult<()> {
535        info!("Shutting down server gracefully");
536        self.health.set_status(HealthStatus::ShuttingDown);
537
538        let shutdown_timeout = self.config.shutdown_timeout();
539
540        // Shutdown with timeout
541        match tokio::time::timeout(shutdown_timeout, self.shutdown_internal()).await {
542            Ok(result) => result,
543            Err(_) => {
544                error!("Shutdown timeout exceeded");
545                Err(ServerError::ShutdownTimeout)
546            }
547        }
548    }
549
550    /// Internal shutdown logic
551    async fn shutdown_internal(&mut self) -> ServerResult<()> {
552        // 1. Stop accepting new connections
553        info!("Stopping new connections");
554        self.health.set_network_healthy(false);
555
556        // 2. Stop network service
557        if let Some(ref mut network) = self.network {
558            network.stop().await?;
559        }
560
561        // 2. Wait for active connections to drain
562        let max_wait = Duration::from_secs(5);
563        let start = std::time::Instant::now();
564        while self.metrics.snapshot().active_connections > 0 && start.elapsed() < max_wait {
565            info!(
566                "Waiting for {} active connections to drain",
567                self.metrics.snapshot().active_connections
568            );
569            sleep(Duration::from_millis(100)).await;
570        }
571
572        // 3. Flush storage
573        if let Some(ref storage) = self.storage {
574            info!("Flushing storage");
575            storage
576                .flush()
577                .await
578                .map_err(|e| ServerError::Storage(format!("Failed to flush storage: {}", e)))?;
579        }
580
581        // 4. Close storage
582        if let Some(ref storage) = self.storage {
583            info!("Closing storage");
584            storage
585                .close()
586                .await
587                .map_err(|e| ServerError::Storage(format!("Failed to close storage: {}", e)))?;
588        }
589
590        self.health.set_storage_healthy(false);
591
592        info!("Server shutdown complete");
593        Ok(())
594    }
595
596    /// Get shutdown coordinator
597    pub fn shutdown_coordinator(&self) -> &ShutdownCoordinator {
598        &self.shutdown
599    }
600
601    /// Get health checker
602    pub fn health_checker(&self) -> &HealthChecker {
603        &self.health
604    }
605
606    /// Get metrics collector
607    pub fn metrics_collector(&self) -> &MetricsCollector {
608        &self.metrics
609    }
610
611    /// Get configuration
612    pub fn config(&self) -> &ServerConfig {
613        &self.config
614    }
615
616    /// Check if server is running (by checking PID file)
617    pub fn is_running(config: &ServerConfig) -> bool {
618        let pid_file = &config.server.pid_file;
619        if !pid_file.exists() {
620            return false;
621        }
622
623        // Read PID from file
624        if let Ok(contents) = fs::read_to_string(pid_file) {
625            if let Ok(pid) = contents.trim().parse::<i32>() {
626                // Check if process exists (Unix-specific)
627                #[cfg(unix)]
628                {
629                    use std::process::Command;
630                    let output = Command::new("kill").arg("-0").arg(pid.to_string()).output();
631                    if let Ok(output) = output {
632                        return output.status.success();
633                    }
634                }
635                #[cfg(not(unix))]
636                {
637                    // On non-Unix, assume running if PID file exists
638                    return true;
639                }
640            }
641        }
642
643        false
644    }
645
646    /// Write PID file
647    pub fn write_pid_file(config: &ServerConfig) -> ServerResult<()> {
648        let pid = std::process::id();
649        let pid_file = &config.server.pid_file;
650
651        // Create parent directory if needed
652        if let Some(parent) = pid_file.parent() {
653            fs::create_dir_all(parent)?;
654        }
655
656        fs::write(pid_file, pid.to_string())?;
657        info!("PID file written: {} (pid: {})", pid_file.display(), pid);
658        Ok(())
659    }
660
661    /// Remove PID file
662    pub fn remove_pid_file(config: &ServerConfig) -> ServerResult<()> {
663        let pid_file = &config.server.pid_file;
664        if pid_file.exists() {
665            fs::remove_file(pid_file)?;
666            info!("PID file removed: {}", pid_file.display());
667        }
668        Ok(())
669    }
670
671    /// Attempt to register a new active query.
672    /// Returns Ok(QueryGuard) if under the limit, Err if RESOURCE_EXHAUSTED.
673    pub fn try_acquire_query(&self) -> Result<QueryGuard, ServerError> {
674        let limit = self.config.resource_limits.max_active_queries;
675        let current = self.active_queries.fetch_add(1, Ordering::AcqRel);
676        if current >= limit {
677            self.active_queries.fetch_sub(1, Ordering::AcqRel);
678            Err(ServerError::ResourceExhausted(format!(
679                "Active query limit ({}) exceeded",
680                limit
681            )))
682        } else {
683            Ok(QueryGuard {
684                counter: Arc::clone(&self.active_queries),
685            })
686        }
687    }
688
689    /// Get current active query count
690    pub fn active_query_count(&self) -> usize {
691        self.active_queries.load(Ordering::Acquire)
692    }
693
694    /// Send stop signal to running server
695    #[cfg(unix)]
696    pub fn stop_server(config: &ServerConfig, force: bool) -> ServerResult<()> {
697        let pid_file = &config.server.pid_file;
698
699        if !pid_file.exists() {
700            warn!("PID file not found - server may not be running");
701            return Ok(());
702        }
703
704        let contents = fs::read_to_string(pid_file)?;
705        let pid = contents
706            .trim()
707            .parse::<i32>()
708            .map_err(|e| ServerError::Config(format!("Invalid PID in file: {}", e)))?;
709
710        let signal = if force { "SIGKILL" } else { "SIGTERM" };
711        info!("Sending {} to process {}", signal, pid);
712
713        use std::process::Command;
714        let signal_arg = if force { "-9" } else { "-15" };
715
716        let output = Command::new("kill")
717            .arg(signal_arg)
718            .arg(pid.to_string())
719            .output()?;
720
721        if !output.status.success() {
722            let stderr = String::from_utf8_lossy(&output.stderr);
723            return Err(ServerError::Network(format!(
724                "Failed to stop server: {}",
725                stderr
726            )));
727        }
728
729        info!("Stop signal sent successfully");
730        Ok(())
731    }
732
733    #[cfg(not(unix))]
734    pub fn stop_server(_config: &ServerConfig, _force: bool) -> ServerResult<()> {
735        Err(ServerError::Config(
736            "Stop command is not supported on this platform".to_string(),
737        ))
738    }
739}
740
741#[cfg(test)]
742mod tests {
743    use super::*;
744    use std::env;
745
746    #[tokio::test]
747    async fn test_server_creation() {
748        let config = ServerConfig::default();
749        let server = Server::new(config);
750
751        assert_eq!(server.health_checker().status(), HealthStatus::Starting);
752    }
753
754    #[tokio::test]
755    async fn test_server_initialization() {
756        let mut config = ServerConfig::default();
757        config.server.data_dir = env::temp_dir().join("amaters_test_init");
758        config.storage.engine = "memory".to_string();
759
760        let mut server = Server::new(config);
761        let result = server.initialize().await;
762
763        assert!(result.is_ok());
764        assert!(server.storage.is_some());
765
766        // Cleanup
767        if server.config.server.data_dir.exists() {
768            fs::remove_dir_all(&server.config.server.data_dir).ok();
769        }
770    }
771
772    #[tokio::test]
773    async fn test_lsm_initialization() {
774        let mut config = ServerConfig::default();
775        config.server.data_dir = env::temp_dir().join("amaters_test_lsm");
776        config.storage.engine = "lsm".to_string();
777
778        let mut server = Server::new(config);
779        let result = server.initialize().await;
780
781        assert!(result.is_ok());
782        assert!(server.storage.is_some());
783
784        // Cleanup
785        if server.config.server.data_dir.exists() {
786            fs::remove_dir_all(&server.config.server.data_dir).ok();
787        }
788    }
789
790    #[tokio::test]
791    async fn test_data_directory_creation() {
792        let mut config = ServerConfig::default();
793        config.server.data_dir = env::temp_dir().join("amaters_test_dir");
794
795        // Ensure directory doesn't exist
796        if config.server.data_dir.exists() {
797            fs::remove_dir_all(&config.server.data_dir).ok();
798        }
799
800        let mut server = Server::new(config.clone());
801        server
802            .ensure_data_directory()
803            .expect("Failed to create directory");
804
805        assert!(config.server.data_dir.exists());
806
807        // Cleanup
808        fs::remove_dir_all(&config.server.data_dir).ok();
809    }
810
811    #[tokio::test]
812    async fn test_shutdown_coordinator() {
813        let config = ServerConfig::default();
814        let server = Server::new(config);
815
816        let coordinator = server.shutdown_coordinator();
817        assert!(!coordinator.is_shutting_down());
818
819        coordinator.shutdown();
820        assert!(coordinator.is_shutting_down());
821    }
822
823    /// Test that server creation works without cluster config (default)
824    #[tokio::test]
825    async fn test_server_creation_without_cluster() {
826        let config = ServerConfig::default();
827        assert!(config.cluster.is_none());
828
829        let server = Server::new(config);
830        #[cfg(feature = "cluster")]
831        assert!(server.cluster_node.is_none());
832        assert_eq!(server.health_checker().status(), HealthStatus::Starting);
833    }
834
835    /// Test server initialization with cluster config enabled (3-node cluster)
836    #[cfg(feature = "cluster")]
837    #[tokio::test]
838    async fn test_server_creation_with_cluster_config() {
839        use crate::config::ClusterSettings;
840
841        let mut config = ServerConfig::default();
842        config.server.data_dir = env::temp_dir().join("amaters_test_cluster");
843        config.storage.engine = "memory".to_string();
844        config.cluster = Some(ClusterSettings {
845            enabled: true,
846            node_id: 1,
847            peers: vec![
848                "1:127.0.0.1:7879".to_string(),
849                "2:127.0.0.1:7880".to_string(),
850                "3:127.0.0.1:7881".to_string(),
851            ],
852            heartbeat_interval_ms: 50,
853            election_timeout_ms: 300,
854        });
855
856        let mut server = Server::new(config);
857        let result = server.initialize().await;
858        assert!(result.is_ok());
859
860        // Cluster node should be initialized
861        assert!(server.cluster_node.is_some());
862
863        // Cluster status should be available
864        let status = server.cluster_status();
865        assert!(status.is_some());
866        let status = status.expect("cluster status should exist");
867        assert_eq!(status.node_id, 1);
868        assert_eq!(status.state, "Follower");
869        assert_eq!(status.current_term, 0);
870        assert!(!status.is_leader);
871
872        // Health should reflect cluster is active
873        let health = server.health_checker().get_health();
874        let cluster_component = health
875            .components
876            .iter()
877            .find(|c| c.name == "cluster")
878            .expect("cluster component should exist");
879        assert_eq!(cluster_component.status, HealthStatus::Healthy);
880
881        // Cleanup
882        if server.config.server.data_dir.exists() {
883            fs::remove_dir_all(&server.config.server.data_dir).ok();
884        }
885    }
886
887    /// Test cluster config defaults and validation
888    #[test]
889    fn test_cluster_config_defaults() {
890        let config = ServerConfig::default();
891        // By default, cluster is None (disabled)
892        assert!(config.cluster.is_none());
893    }
894
895    /// Test cluster config validation: enabled but no peers should fail config validation
896    #[test]
897    fn test_cluster_config_validation() {
898        use crate::config::ClusterSettings;
899
900        let config = ServerConfig {
901            cluster: Some(ClusterSettings {
902                enabled: true,
903                node_id: 1,
904                peers: Vec::new(), // No peers - should fail validation
905                heartbeat_interval_ms: 50,
906                election_timeout_ms: 300,
907            }),
908            ..Default::default()
909        };
910
911        let result = config.validate();
912        assert!(result.is_err());
913    }
914
915    /// Test that server works with cluster disabled even when config is present
916    #[cfg(feature = "cluster")]
917    #[tokio::test]
918    async fn test_server_cluster_disabled_explicitly() {
919        use crate::config::ClusterSettings;
920
921        let mut config = ServerConfig::default();
922        config.server.data_dir = env::temp_dir().join("amaters_test_cluster_disabled");
923        config.storage.engine = "memory".to_string();
924        config.cluster = Some(ClusterSettings {
925            enabled: false,
926            node_id: 1,
927            peers: Vec::new(),
928            heartbeat_interval_ms: 50,
929            election_timeout_ms: 300,
930        });
931
932        let mut server = Server::new(config);
933        let result = server.initialize().await;
934        assert!(result.is_ok());
935
936        // Cluster node should NOT be initialized
937        assert!(server.cluster_node.is_none());
938        assert!(server.cluster_status().is_none());
939
940        // Cleanup
941        if server.config.server.data_dir.exists() {
942            fs::remove_dir_all(&server.config.server.data_dir).ok();
943        }
944    }
945
946    #[tokio::test]
947    async fn test_max_active_queries_enforced() {
948        let mut config = ServerConfig::default();
949        config.resource_limits.max_active_queries = 2;
950        let server = Server::new(config);
951
952        // Acquire up to limit
953        let _guard1 = server.try_acquire_query().expect("Should acquire query 1");
954        let _guard2 = server.try_acquire_query().expect("Should acquire query 2");
955
956        // Exceed limit
957        let result = server.try_acquire_query();
958        assert!(result.is_err());
959        match result {
960            Err(ServerError::ResourceExhausted(_)) => {}
961            other => panic!("Expected ResourceExhausted, got {:?}", other),
962        }
963
964        assert_eq!(server.active_query_count(), 2);
965    }
966
967    #[tokio::test]
968    async fn test_query_guard_decrements_on_drop() {
969        let mut config = ServerConfig::default();
970        config.resource_limits.max_active_queries = 5;
971        let server = Server::new(config);
972
973        {
974            let _guard = server.try_acquire_query().expect("Should acquire");
975            assert_eq!(server.active_query_count(), 1);
976        } // guard dropped here
977
978        assert_eq!(server.active_query_count(), 0);
979    }
980
981    #[tokio::test]
982    async fn test_per_client_connection_limit() {
983        // Tests that the ResourceLimits config is accessible and has correct defaults
984        let config = ServerConfig::default();
985        assert_eq!(config.resource_limits.max_connections_per_client, 10);
986        assert_eq!(config.resource_limits.max_active_queries, 1000);
987    }
988}