amaters_server/
server.rs

1//! Server runtime module
2//!
3//! This module integrates all server components:
4//! - Storage engine (amaters-core)
5//! - Network layer (amaters-net)
6//! - Consensus (amaters-cluster)
7//! - Health checking
8//! - Metrics collection
9
10use crate::config::{ConfigResult, ServerConfig};
11use crate::health::{HealthChecker, HealthStatus};
12use crate::metrics::MetricsCollector;
13use crate::service::NetworkService;
14use crate::shutdown::ShutdownCoordinator;
15use amaters_core::error::Result as CoreResult;
16use amaters_core::storage::{
17    BlockCacheConfig, CompactionConfig, LsmTreeConfig, LsmTreeStorage, MemoryStorage,
18    MemtableConfig, SSTableConfig,
19};
20use amaters_core::traits::StorageEngine;
21use amaters_core::types::{CipherBlob, Key};
22use async_trait::async_trait;
23use std::fs;
24use std::path::Path;
25use std::sync::Arc;
26use std::time::Duration;
27use thiserror::Error;
28use tokio::time::sleep;
29use tracing::{error, info, warn};
30
31/// Storage engine wrapper enum to support multiple storage types
32#[derive(Clone)]
33pub enum Storage {
34    Memory(MemoryStorage),
35    Lsm(LsmTreeStorage),
36}
37
38#[async_trait]
39impl StorageEngine for Storage {
40    async fn put(&self, key: &Key, value: &CipherBlob) -> CoreResult<()> {
41        match self {
42            Storage::Memory(s) => s.put(key, value).await,
43            Storage::Lsm(s) => s.put(key, value).await,
44        }
45    }
46
47    async fn get(&self, key: &Key) -> CoreResult<Option<CipherBlob>> {
48        match self {
49            Storage::Memory(s) => s.get(key).await,
50            Storage::Lsm(s) => s.get(key).await,
51        }
52    }
53
54    async fn atomic_update<F>(&self, key: &Key, f: F) -> CoreResult<()>
55    where
56        F: Fn(&CipherBlob) -> CoreResult<CipherBlob> + Send + Sync,
57    {
58        match self {
59            Storage::Memory(s) => s.atomic_update(key, f).await,
60            Storage::Lsm(s) => s.atomic_update(key, f).await,
61        }
62    }
63
64    async fn delete(&self, key: &Key) -> CoreResult<()> {
65        match self {
66            Storage::Memory(s) => s.delete(key).await,
67            Storage::Lsm(s) => s.delete(key).await,
68        }
69    }
70
71    async fn range(&self, start: &Key, end: &Key) -> CoreResult<Vec<(Key, CipherBlob)>> {
72        match self {
73            Storage::Memory(s) => s.range(start, end).await,
74            Storage::Lsm(s) => s.range(start, end).await,
75        }
76    }
77
78    async fn keys(&self) -> CoreResult<Vec<Key>> {
79        match self {
80            Storage::Memory(s) => s.keys().await,
81            Storage::Lsm(s) => s.keys().await,
82        }
83    }
84
85    async fn flush(&self) -> CoreResult<()> {
86        match self {
87            Storage::Memory(s) => s.flush().await,
88            Storage::Lsm(s) => s.flush().await,
89        }
90    }
91
92    async fn close(&self) -> CoreResult<()> {
93        match self {
94            Storage::Memory(s) => s.close().await,
95            Storage::Lsm(s) => s.close().await,
96        }
97    }
98}
99
100/// Server errors
101#[derive(Error, Debug)]
102pub enum ServerError {
103    #[error("Configuration error: {0}")]
104    Config(String),
105
106    #[error("Configuration validation error: {0}")]
107    ConfigValidation(String),
108
109    #[error("Storage initialization failed: {0}")]
110    Storage(String),
111
112    #[error("Network initialization failed: {0}")]
113    Network(String),
114
115    #[error("Cluster initialization failed: {0}")]
116    Cluster(String),
117
118    #[error("TLS setup failed: {0}")]
119    TlsSetup(String),
120
121    #[error("Server already running")]
122    AlreadyRunning,
123
124    #[error("Failed to create directory: {0}")]
125    DirectoryCreation(#[from] std::io::Error),
126
127    #[error("Shutdown timeout")]
128    ShutdownTimeout,
129
130    #[error("Core error: {0}")]
131    Core(#[from] amaters_core::error::AmateRSError),
132}
133
134pub type ServerResult<T> = Result<T, ServerError>;
135
136/// Main server runtime
137pub struct Server {
138    /// Server configuration
139    config: Arc<ServerConfig>,
140    /// Storage engine (supports memory or LSM)
141    storage: Option<Arc<Storage>>,
142    /// Network service (AQL API)
143    network: Option<NetworkService>,
144    /// Shutdown coordinator
145    shutdown: ShutdownCoordinator,
146    /// Health checker
147    health: HealthChecker,
148    /// Metrics collector
149    metrics: MetricsCollector,
150}
151
152impl Server {
153    /// Create a new server with the given configuration
154    pub fn new(config: ServerConfig) -> Self {
155        Self {
156            config: Arc::new(config),
157            storage: None,
158            network: None,
159            shutdown: ShutdownCoordinator::new(),
160            health: HealthChecker::new(),
161            metrics: MetricsCollector::new(),
162        }
163    }
164
165    /// Initialize server components
166    pub async fn initialize(&mut self) -> ServerResult<()> {
167        info!("Initializing server components");
168
169        // Create data directory if it doesn't exist
170        self.ensure_data_directory()?;
171
172        // Initialize storage
173        self.initialize_storage().await?;
174
175        // Initialize network service
176        self.initialize_network().await?;
177
178        // Initialize health checker
179        self.health.set_status(HealthStatus::Starting);
180
181        info!("Server components initialized successfully");
182        Ok(())
183    }
184
185    /// Ensure data directory exists
186    fn ensure_data_directory(&self) -> ServerResult<()> {
187        let data_dir = &self.config.server.data_dir;
188        if !data_dir.exists() {
189            info!("Creating data directory: {}", data_dir.display());
190            fs::create_dir_all(data_dir)?;
191        }
192        Ok(())
193    }
194
195    /// Initialize storage engine
196    async fn initialize_storage(&mut self) -> ServerResult<()> {
197        info!(
198            "Initializing storage engine: {}",
199            self.config.storage.engine
200        );
201
202        let storage = match self.config.storage.engine.as_str() {
203            "memory" => {
204                info!("Using in-memory storage engine");
205                Storage::Memory(MemoryStorage::new())
206            }
207            "lsm" => {
208                info!("Using LSM-Tree storage engine");
209                let lsm_config = self.build_lsm_config()?;
210                let lsm_storage = LsmTreeStorage::with_config(lsm_config).map_err(|e| {
211                    ServerError::Storage(format!("Failed to create LSM storage: {}", e))
212                })?;
213                Storage::Lsm(lsm_storage)
214            }
215            other => {
216                return Err(ServerError::Config(format!(
217                    "Invalid storage engine: {}. Supported: memory, lsm",
218                    other
219                )));
220            }
221        };
222
223        self.storage = Some(Arc::new(storage));
224        self.health.set_storage_healthy(true);
225
226        info!("Storage engine initialized successfully");
227        Ok(())
228    }
229
230    /// Build LSM-Tree configuration from server config
231    fn build_lsm_config(&self) -> ServerResult<LsmTreeConfig> {
232        let data_dir = self.config.server.data_dir.join("lsm");
233        let wal_dir = self
234            .config
235            .server
236            .data_dir
237            .join(self.config.storage.wal.dir.clone());
238
239        // Create directories
240        std::fs::create_dir_all(&data_dir).map_err(|e| {
241            ServerError::Storage(format!("Failed to create LSM data directory: {}", e))
242        })?;
243        std::fs::create_dir_all(&wal_dir)
244            .map_err(|e| ServerError::Storage(format!("Failed to create WAL directory: {}", e)))?;
245
246        let memtable_config = MemtableConfig {
247            max_size_bytes: self.config.storage.memtable_size_mb * 1024 * 1024,
248            enable_wal: self.config.storage.wal.enabled,
249        };
250
251        let sstable_config = SSTableConfig {
252            block_size: 4096,
253            enable_compression: true,
254        };
255
256        let block_cache_config = BlockCacheConfig {
257            max_size_bytes: self.config.storage.block_cache_size_mb * 1024 * 1024,
258            enable_stats: true,
259        };
260
261        let compaction_config = CompactionConfig {
262            strategy: match self.config.storage.compaction.strategy.as_str() {
263                "tiered" => amaters_core::storage::CompactionStrategy::SizeTiered,
264                _ => amaters_core::storage::CompactionStrategy::LevelBased,
265            },
266            l0_threshold: 4,
267            level_multiplier: self.config.storage.compaction.level_multiplier,
268            base_level_size: 10 * 1024 * 1024,       // 10 MB
269            max_compaction_bytes: 100 * 1024 * 1024, // 100 MB
270        };
271
272        // Optional value log configuration for large values
273        let value_log_config = None; // Disabled for now, can be enabled later
274
275        Ok(LsmTreeConfig {
276            data_dir,
277            wal_dir,
278            memtable_config,
279            sstable_config,
280            block_cache_config,
281            compaction_config,
282            value_log_config,
283            max_levels: self.config.storage.compaction.num_levels,
284            l0_compaction_threshold: 4,
285            level_size_multiplier: self.config.storage.compaction.level_multiplier,
286        })
287    }
288
289    /// Initialize network service
290    async fn initialize_network(&mut self) -> ServerResult<()> {
291        info!("Initializing network service");
292
293        let storage = self
294            .storage
295            .as_ref()
296            .ok_or_else(|| ServerError::Config("Storage not initialized".to_string()))?
297            .clone();
298
299        let network = NetworkService::new(
300            storage,
301            self.config.clone(),
302            self.health.clone(),
303            self.metrics.clone(),
304            self.shutdown.clone(),
305        );
306
307        self.network = Some(network);
308        self.health.set_network_healthy(true);
309
310        info!("Network service initialized successfully");
311        Ok(())
312    }
313
314    /// Start the server
315    pub async fn start(&mut self) -> ServerResult<()> {
316        info!("Starting AmateRS server v{}", env!("CARGO_PKG_VERSION"));
317        info!("Bind address: {}", self.config.server.bind_address);
318        info!("Data directory: {}", self.config.server.data_dir.display());
319
320        // Start network service
321        if let Some(ref mut network) = self.network {
322            network.start().await?;
323        }
324
325        // Mark server as healthy
326        self.health.set_status(HealthStatus::Healthy);
327        self.health.set_network_healthy(true);
328
329        info!("Server started successfully");
330        info!("Press Ctrl+C to shutdown");
331
332        // Wait for shutdown signal
333        let mut shutdown_rx = self.shutdown.subscribe();
334        shutdown_rx
335            .recv()
336            .await
337            .map_err(|e| ServerError::Network(format!("Shutdown channel error: {}", e)))?;
338
339        info!("Shutdown signal received");
340        Ok(())
341    }
342
343    /// Gracefully shutdown the server
344    pub async fn shutdown(&mut self) -> ServerResult<()> {
345        info!("Shutting down server gracefully");
346        self.health.set_status(HealthStatus::ShuttingDown);
347
348        let shutdown_timeout = self.config.shutdown_timeout();
349
350        // Shutdown with timeout
351        match tokio::time::timeout(shutdown_timeout, self.shutdown_internal()).await {
352            Ok(result) => result,
353            Err(_) => {
354                error!("Shutdown timeout exceeded");
355                Err(ServerError::ShutdownTimeout)
356            }
357        }
358    }
359
360    /// Internal shutdown logic
361    async fn shutdown_internal(&mut self) -> ServerResult<()> {
362        // 1. Stop accepting new connections
363        info!("Stopping new connections");
364        self.health.set_network_healthy(false);
365
366        // 2. Stop network service
367        if let Some(ref mut network) = self.network {
368            network.stop().await?;
369        }
370
371        // 2. Wait for active connections to drain
372        let max_wait = Duration::from_secs(5);
373        let start = std::time::Instant::now();
374        while self.metrics.snapshot().active_connections > 0 && start.elapsed() < max_wait {
375            info!(
376                "Waiting for {} active connections to drain",
377                self.metrics.snapshot().active_connections
378            );
379            sleep(Duration::from_millis(100)).await;
380        }
381
382        // 3. Flush storage
383        if let Some(ref storage) = self.storage {
384            info!("Flushing storage");
385            storage
386                .flush()
387                .await
388                .map_err(|e| ServerError::Storage(format!("Failed to flush storage: {}", e)))?;
389        }
390
391        // 4. Close storage
392        if let Some(ref storage) = self.storage {
393            info!("Closing storage");
394            storage
395                .close()
396                .await
397                .map_err(|e| ServerError::Storage(format!("Failed to close storage: {}", e)))?;
398        }
399
400        self.health.set_storage_healthy(false);
401
402        info!("Server shutdown complete");
403        Ok(())
404    }
405
406    /// Get shutdown coordinator
407    pub fn shutdown_coordinator(&self) -> &ShutdownCoordinator {
408        &self.shutdown
409    }
410
411    /// Get health checker
412    pub fn health_checker(&self) -> &HealthChecker {
413        &self.health
414    }
415
416    /// Get metrics collector
417    pub fn metrics_collector(&self) -> &MetricsCollector {
418        &self.metrics
419    }
420
421    /// Get configuration
422    pub fn config(&self) -> &ServerConfig {
423        &self.config
424    }
425
426    /// Check if server is running (by checking PID file)
427    pub fn is_running(config: &ServerConfig) -> bool {
428        let pid_file = &config.server.pid_file;
429        if !pid_file.exists() {
430            return false;
431        }
432
433        // Read PID from file
434        if let Ok(contents) = fs::read_to_string(pid_file) {
435            if let Ok(pid) = contents.trim().parse::<i32>() {
436                // Check if process exists (Unix-specific)
437                #[cfg(unix)]
438                {
439                    use std::process::Command;
440                    let output = Command::new("kill").arg("-0").arg(pid.to_string()).output();
441                    if let Ok(output) = output {
442                        return output.status.success();
443                    }
444                }
445                #[cfg(not(unix))]
446                {
447                    // On non-Unix, assume running if PID file exists
448                    return true;
449                }
450            }
451        }
452
453        false
454    }
455
456    /// Write PID file
457    pub fn write_pid_file(config: &ServerConfig) -> ServerResult<()> {
458        let pid = std::process::id();
459        let pid_file = &config.server.pid_file;
460
461        // Create parent directory if needed
462        if let Some(parent) = pid_file.parent() {
463            fs::create_dir_all(parent)?;
464        }
465
466        fs::write(pid_file, pid.to_string())?;
467        info!("PID file written: {} (pid: {})", pid_file.display(), pid);
468        Ok(())
469    }
470
471    /// Remove PID file
472    pub fn remove_pid_file(config: &ServerConfig) -> ServerResult<()> {
473        let pid_file = &config.server.pid_file;
474        if pid_file.exists() {
475            fs::remove_file(pid_file)?;
476            info!("PID file removed: {}", pid_file.display());
477        }
478        Ok(())
479    }
480
481    /// Send stop signal to running server
482    #[cfg(unix)]
483    pub fn stop_server(config: &ServerConfig, force: bool) -> ServerResult<()> {
484        let pid_file = &config.server.pid_file;
485
486        if !pid_file.exists() {
487            warn!("PID file not found - server may not be running");
488            return Ok(());
489        }
490
491        let contents = fs::read_to_string(pid_file)?;
492        let pid = contents
493            .trim()
494            .parse::<i32>()
495            .map_err(|e| ServerError::Config(format!("Invalid PID in file: {}", e)))?;
496
497        let signal = if force { "SIGKILL" } else { "SIGTERM" };
498        info!("Sending {} to process {}", signal, pid);
499
500        use std::process::Command;
501        let signal_arg = if force { "-9" } else { "-15" };
502
503        let output = Command::new("kill")
504            .arg(signal_arg)
505            .arg(pid.to_string())
506            .output()?;
507
508        if !output.status.success() {
509            let stderr = String::from_utf8_lossy(&output.stderr);
510            return Err(ServerError::Network(format!(
511                "Failed to stop server: {}",
512                stderr
513            )));
514        }
515
516        info!("Stop signal sent successfully");
517        Ok(())
518    }
519
520    #[cfg(not(unix))]
521    pub fn stop_server(_config: &ServerConfig, _force: bool) -> ServerResult<()> {
522        Err(ServerError::Config(
523            "Stop command is not supported on this platform".to_string(),
524        ))
525    }
526}
527
528#[cfg(test)]
529mod tests {
530    use super::*;
531    use std::env;
532
533    #[tokio::test]
534    async fn test_server_creation() {
535        let config = ServerConfig::default();
536        let server = Server::new(config);
537
538        assert_eq!(server.health_checker().status(), HealthStatus::Starting);
539    }
540
541    #[tokio::test]
542    async fn test_server_initialization() {
543        let mut config = ServerConfig::default();
544        config.server.data_dir = env::temp_dir().join("amaters_test_init");
545        config.storage.engine = "memory".to_string();
546
547        let mut server = Server::new(config);
548        let result = server.initialize().await;
549
550        assert!(result.is_ok());
551        assert!(server.storage.is_some());
552
553        // Cleanup
554        if server.config.server.data_dir.exists() {
555            fs::remove_dir_all(&server.config.server.data_dir).ok();
556        }
557    }
558
559    #[tokio::test]
560    async fn test_lsm_initialization() {
561        let mut config = ServerConfig::default();
562        config.server.data_dir = env::temp_dir().join("amaters_test_lsm");
563        config.storage.engine = "lsm".to_string();
564
565        let mut server = Server::new(config);
566        let result = server.initialize().await;
567
568        assert!(result.is_ok());
569        assert!(server.storage.is_some());
570
571        // Cleanup
572        if server.config.server.data_dir.exists() {
573            fs::remove_dir_all(&server.config.server.data_dir).ok();
574        }
575    }
576
577    #[tokio::test]
578    async fn test_data_directory_creation() {
579        let mut config = ServerConfig::default();
580        config.server.data_dir = env::temp_dir().join("amaters_test_dir");
581
582        // Ensure directory doesn't exist
583        if config.server.data_dir.exists() {
584            fs::remove_dir_all(&config.server.data_dir).ok();
585        }
586
587        let mut server = Server::new(config.clone());
588        server
589            .ensure_data_directory()
590            .expect("Failed to create directory");
591
592        assert!(config.server.data_dir.exists());
593
594        // Cleanup
595        fs::remove_dir_all(&config.server.data_dir).ok();
596    }
597
598    #[tokio::test]
599    async fn test_shutdown_coordinator() {
600        let config = ServerConfig::default();
601        let server = Server::new(config);
602
603        let coordinator = server.shutdown_coordinator();
604        assert!(!coordinator.is_shutting_down());
605
606        coordinator.shutdown();
607        assert!(coordinator.is_shutting_down());
608    }
609}