1use crate::config::{ConfigResult, ServerConfig};
11use crate::health::{HealthChecker, HealthStatus};
12use crate::metrics::MetricsCollector;
13use crate::service::NetworkService;
14use crate::shutdown::ShutdownCoordinator;
15use amaters_core::error::Result as CoreResult;
16use amaters_core::storage::{
17 BlockCacheConfig, CompactionConfig, LsmTreeConfig, LsmTreeStorage, MemoryStorage,
18 MemtableConfig, SSTableConfig,
19};
20use amaters_core::traits::StorageEngine;
21use amaters_core::types::{CipherBlob, Key};
22use async_trait::async_trait;
23use std::fs;
24use std::path::Path;
25use std::sync::Arc;
26use std::time::Duration;
27use thiserror::Error;
28use tokio::time::sleep;
29use tracing::{error, info, warn};
30
31#[derive(Clone)]
33pub enum Storage {
34 Memory(MemoryStorage),
35 Lsm(LsmTreeStorage),
36}
37
38#[async_trait]
39impl StorageEngine for Storage {
40 async fn put(&self, key: &Key, value: &CipherBlob) -> CoreResult<()> {
41 match self {
42 Storage::Memory(s) => s.put(key, value).await,
43 Storage::Lsm(s) => s.put(key, value).await,
44 }
45 }
46
47 async fn get(&self, key: &Key) -> CoreResult<Option<CipherBlob>> {
48 match self {
49 Storage::Memory(s) => s.get(key).await,
50 Storage::Lsm(s) => s.get(key).await,
51 }
52 }
53
54 async fn atomic_update<F>(&self, key: &Key, f: F) -> CoreResult<()>
55 where
56 F: Fn(&CipherBlob) -> CoreResult<CipherBlob> + Send + Sync,
57 {
58 match self {
59 Storage::Memory(s) => s.atomic_update(key, f).await,
60 Storage::Lsm(s) => s.atomic_update(key, f).await,
61 }
62 }
63
64 async fn delete(&self, key: &Key) -> CoreResult<()> {
65 match self {
66 Storage::Memory(s) => s.delete(key).await,
67 Storage::Lsm(s) => s.delete(key).await,
68 }
69 }
70
71 async fn range(&self, start: &Key, end: &Key) -> CoreResult<Vec<(Key, CipherBlob)>> {
72 match self {
73 Storage::Memory(s) => s.range(start, end).await,
74 Storage::Lsm(s) => s.range(start, end).await,
75 }
76 }
77
78 async fn keys(&self) -> CoreResult<Vec<Key>> {
79 match self {
80 Storage::Memory(s) => s.keys().await,
81 Storage::Lsm(s) => s.keys().await,
82 }
83 }
84
85 async fn flush(&self) -> CoreResult<()> {
86 match self {
87 Storage::Memory(s) => s.flush().await,
88 Storage::Lsm(s) => s.flush().await,
89 }
90 }
91
92 async fn close(&self) -> CoreResult<()> {
93 match self {
94 Storage::Memory(s) => s.close().await,
95 Storage::Lsm(s) => s.close().await,
96 }
97 }
98}
99
100#[derive(Error, Debug)]
102pub enum ServerError {
103 #[error("Configuration error: {0}")]
104 Config(String),
105
106 #[error("Configuration validation error: {0}")]
107 ConfigValidation(String),
108
109 #[error("Storage initialization failed: {0}")]
110 Storage(String),
111
112 #[error("Network initialization failed: {0}")]
113 Network(String),
114
115 #[error("Cluster initialization failed: {0}")]
116 Cluster(String),
117
118 #[error("TLS setup failed: {0}")]
119 TlsSetup(String),
120
121 #[error("Server already running")]
122 AlreadyRunning,
123
124 #[error("Failed to create directory: {0}")]
125 DirectoryCreation(#[from] std::io::Error),
126
127 #[error("Shutdown timeout")]
128 ShutdownTimeout,
129
130 #[error("Core error: {0}")]
131 Core(#[from] amaters_core::error::AmateRSError),
132}
133
134pub type ServerResult<T> = Result<T, ServerError>;
135
136pub struct Server {
138 config: Arc<ServerConfig>,
140 storage: Option<Arc<Storage>>,
142 network: Option<NetworkService>,
144 shutdown: ShutdownCoordinator,
146 health: HealthChecker,
148 metrics: MetricsCollector,
150}
151
152impl Server {
153 pub fn new(config: ServerConfig) -> Self {
155 Self {
156 config: Arc::new(config),
157 storage: None,
158 network: None,
159 shutdown: ShutdownCoordinator::new(),
160 health: HealthChecker::new(),
161 metrics: MetricsCollector::new(),
162 }
163 }
164
165 pub async fn initialize(&mut self) -> ServerResult<()> {
167 info!("Initializing server components");
168
169 self.ensure_data_directory()?;
171
172 self.initialize_storage().await?;
174
175 self.initialize_network().await?;
177
178 self.health.set_status(HealthStatus::Starting);
180
181 info!("Server components initialized successfully");
182 Ok(())
183 }
184
185 fn ensure_data_directory(&self) -> ServerResult<()> {
187 let data_dir = &self.config.server.data_dir;
188 if !data_dir.exists() {
189 info!("Creating data directory: {}", data_dir.display());
190 fs::create_dir_all(data_dir)?;
191 }
192 Ok(())
193 }
194
195 async fn initialize_storage(&mut self) -> ServerResult<()> {
197 info!(
198 "Initializing storage engine: {}",
199 self.config.storage.engine
200 );
201
202 let storage = match self.config.storage.engine.as_str() {
203 "memory" => {
204 info!("Using in-memory storage engine");
205 Storage::Memory(MemoryStorage::new())
206 }
207 "lsm" => {
208 info!("Using LSM-Tree storage engine");
209 let lsm_config = self.build_lsm_config()?;
210 let lsm_storage = LsmTreeStorage::with_config(lsm_config).map_err(|e| {
211 ServerError::Storage(format!("Failed to create LSM storage: {}", e))
212 })?;
213 Storage::Lsm(lsm_storage)
214 }
215 other => {
216 return Err(ServerError::Config(format!(
217 "Invalid storage engine: {}. Supported: memory, lsm",
218 other
219 )));
220 }
221 };
222
223 self.storage = Some(Arc::new(storage));
224 self.health.set_storage_healthy(true);
225
226 info!("Storage engine initialized successfully");
227 Ok(())
228 }
229
230 fn build_lsm_config(&self) -> ServerResult<LsmTreeConfig> {
232 let data_dir = self.config.server.data_dir.join("lsm");
233 let wal_dir = self
234 .config
235 .server
236 .data_dir
237 .join(self.config.storage.wal.dir.clone());
238
239 std::fs::create_dir_all(&data_dir).map_err(|e| {
241 ServerError::Storage(format!("Failed to create LSM data directory: {}", e))
242 })?;
243 std::fs::create_dir_all(&wal_dir)
244 .map_err(|e| ServerError::Storage(format!("Failed to create WAL directory: {}", e)))?;
245
246 let memtable_config = MemtableConfig {
247 max_size_bytes: self.config.storage.memtable_size_mb * 1024 * 1024,
248 enable_wal: self.config.storage.wal.enabled,
249 };
250
251 let sstable_config = SSTableConfig {
252 block_size: 4096,
253 enable_compression: true,
254 };
255
256 let block_cache_config = BlockCacheConfig {
257 max_size_bytes: self.config.storage.block_cache_size_mb * 1024 * 1024,
258 enable_stats: true,
259 };
260
261 let compaction_config = CompactionConfig {
262 strategy: match self.config.storage.compaction.strategy.as_str() {
263 "tiered" => amaters_core::storage::CompactionStrategy::SizeTiered,
264 _ => amaters_core::storage::CompactionStrategy::LevelBased,
265 },
266 l0_threshold: 4,
267 level_multiplier: self.config.storage.compaction.level_multiplier,
268 base_level_size: 10 * 1024 * 1024, max_compaction_bytes: 100 * 1024 * 1024, };
271
272 let value_log_config = None; Ok(LsmTreeConfig {
276 data_dir,
277 wal_dir,
278 memtable_config,
279 sstable_config,
280 block_cache_config,
281 compaction_config,
282 value_log_config,
283 max_levels: self.config.storage.compaction.num_levels,
284 l0_compaction_threshold: 4,
285 level_size_multiplier: self.config.storage.compaction.level_multiplier,
286 })
287 }
288
289 async fn initialize_network(&mut self) -> ServerResult<()> {
291 info!("Initializing network service");
292
293 let storage = self
294 .storage
295 .as_ref()
296 .ok_or_else(|| ServerError::Config("Storage not initialized".to_string()))?
297 .clone();
298
299 let network = NetworkService::new(
300 storage,
301 self.config.clone(),
302 self.health.clone(),
303 self.metrics.clone(),
304 self.shutdown.clone(),
305 );
306
307 self.network = Some(network);
308 self.health.set_network_healthy(true);
309
310 info!("Network service initialized successfully");
311 Ok(())
312 }
313
314 pub async fn start(&mut self) -> ServerResult<()> {
316 info!("Starting AmateRS server v{}", env!("CARGO_PKG_VERSION"));
317 info!("Bind address: {}", self.config.server.bind_address);
318 info!("Data directory: {}", self.config.server.data_dir.display());
319
320 if let Some(ref mut network) = self.network {
322 network.start().await?;
323 }
324
325 self.health.set_status(HealthStatus::Healthy);
327 self.health.set_network_healthy(true);
328
329 info!("Server started successfully");
330 info!("Press Ctrl+C to shutdown");
331
332 let mut shutdown_rx = self.shutdown.subscribe();
334 shutdown_rx
335 .recv()
336 .await
337 .map_err(|e| ServerError::Network(format!("Shutdown channel error: {}", e)))?;
338
339 info!("Shutdown signal received");
340 Ok(())
341 }
342
343 pub async fn shutdown(&mut self) -> ServerResult<()> {
345 info!("Shutting down server gracefully");
346 self.health.set_status(HealthStatus::ShuttingDown);
347
348 let shutdown_timeout = self.config.shutdown_timeout();
349
350 match tokio::time::timeout(shutdown_timeout, self.shutdown_internal()).await {
352 Ok(result) => result,
353 Err(_) => {
354 error!("Shutdown timeout exceeded");
355 Err(ServerError::ShutdownTimeout)
356 }
357 }
358 }
359
360 async fn shutdown_internal(&mut self) -> ServerResult<()> {
362 info!("Stopping new connections");
364 self.health.set_network_healthy(false);
365
366 if let Some(ref mut network) = self.network {
368 network.stop().await?;
369 }
370
371 let max_wait = Duration::from_secs(5);
373 let start = std::time::Instant::now();
374 while self.metrics.snapshot().active_connections > 0 && start.elapsed() < max_wait {
375 info!(
376 "Waiting for {} active connections to drain",
377 self.metrics.snapshot().active_connections
378 );
379 sleep(Duration::from_millis(100)).await;
380 }
381
382 if let Some(ref storage) = self.storage {
384 info!("Flushing storage");
385 storage
386 .flush()
387 .await
388 .map_err(|e| ServerError::Storage(format!("Failed to flush storage: {}", e)))?;
389 }
390
391 if let Some(ref storage) = self.storage {
393 info!("Closing storage");
394 storage
395 .close()
396 .await
397 .map_err(|e| ServerError::Storage(format!("Failed to close storage: {}", e)))?;
398 }
399
400 self.health.set_storage_healthy(false);
401
402 info!("Server shutdown complete");
403 Ok(())
404 }
405
406 pub fn shutdown_coordinator(&self) -> &ShutdownCoordinator {
408 &self.shutdown
409 }
410
411 pub fn health_checker(&self) -> &HealthChecker {
413 &self.health
414 }
415
416 pub fn metrics_collector(&self) -> &MetricsCollector {
418 &self.metrics
419 }
420
421 pub fn config(&self) -> &ServerConfig {
423 &self.config
424 }
425
426 pub fn is_running(config: &ServerConfig) -> bool {
428 let pid_file = &config.server.pid_file;
429 if !pid_file.exists() {
430 return false;
431 }
432
433 if let Ok(contents) = fs::read_to_string(pid_file) {
435 if let Ok(pid) = contents.trim().parse::<i32>() {
436 #[cfg(unix)]
438 {
439 use std::process::Command;
440 let output = Command::new("kill").arg("-0").arg(pid.to_string()).output();
441 if let Ok(output) = output {
442 return output.status.success();
443 }
444 }
445 #[cfg(not(unix))]
446 {
447 return true;
449 }
450 }
451 }
452
453 false
454 }
455
456 pub fn write_pid_file(config: &ServerConfig) -> ServerResult<()> {
458 let pid = std::process::id();
459 let pid_file = &config.server.pid_file;
460
461 if let Some(parent) = pid_file.parent() {
463 fs::create_dir_all(parent)?;
464 }
465
466 fs::write(pid_file, pid.to_string())?;
467 info!("PID file written: {} (pid: {})", pid_file.display(), pid);
468 Ok(())
469 }
470
471 pub fn remove_pid_file(config: &ServerConfig) -> ServerResult<()> {
473 let pid_file = &config.server.pid_file;
474 if pid_file.exists() {
475 fs::remove_file(pid_file)?;
476 info!("PID file removed: {}", pid_file.display());
477 }
478 Ok(())
479 }
480
481 #[cfg(unix)]
483 pub fn stop_server(config: &ServerConfig, force: bool) -> ServerResult<()> {
484 let pid_file = &config.server.pid_file;
485
486 if !pid_file.exists() {
487 warn!("PID file not found - server may not be running");
488 return Ok(());
489 }
490
491 let contents = fs::read_to_string(pid_file)?;
492 let pid = contents
493 .trim()
494 .parse::<i32>()
495 .map_err(|e| ServerError::Config(format!("Invalid PID in file: {}", e)))?;
496
497 let signal = if force { "SIGKILL" } else { "SIGTERM" };
498 info!("Sending {} to process {}", signal, pid);
499
500 use std::process::Command;
501 let signal_arg = if force { "-9" } else { "-15" };
502
503 let output = Command::new("kill")
504 .arg(signal_arg)
505 .arg(pid.to_string())
506 .output()?;
507
508 if !output.status.success() {
509 let stderr = String::from_utf8_lossy(&output.stderr);
510 return Err(ServerError::Network(format!(
511 "Failed to stop server: {}",
512 stderr
513 )));
514 }
515
516 info!("Stop signal sent successfully");
517 Ok(())
518 }
519
520 #[cfg(not(unix))]
521 pub fn stop_server(_config: &ServerConfig, _force: bool) -> ServerResult<()> {
522 Err(ServerError::Config(
523 "Stop command is not supported on this platform".to_string(),
524 ))
525 }
526}
527
528#[cfg(test)]
529mod tests {
530 use super::*;
531 use std::env;
532
533 #[tokio::test]
534 async fn test_server_creation() {
535 let config = ServerConfig::default();
536 let server = Server::new(config);
537
538 assert_eq!(server.health_checker().status(), HealthStatus::Starting);
539 }
540
541 #[tokio::test]
542 async fn test_server_initialization() {
543 let mut config = ServerConfig::default();
544 config.server.data_dir = env::temp_dir().join("amaters_test_init");
545 config.storage.engine = "memory".to_string();
546
547 let mut server = Server::new(config);
548 let result = server.initialize().await;
549
550 assert!(result.is_ok());
551 assert!(server.storage.is_some());
552
553 if server.config.server.data_dir.exists() {
555 fs::remove_dir_all(&server.config.server.data_dir).ok();
556 }
557 }
558
559 #[tokio::test]
560 async fn test_lsm_initialization() {
561 let mut config = ServerConfig::default();
562 config.server.data_dir = env::temp_dir().join("amaters_test_lsm");
563 config.storage.engine = "lsm".to_string();
564
565 let mut server = Server::new(config);
566 let result = server.initialize().await;
567
568 assert!(result.is_ok());
569 assert!(server.storage.is_some());
570
571 if server.config.server.data_dir.exists() {
573 fs::remove_dir_all(&server.config.server.data_dir).ok();
574 }
575 }
576
577 #[tokio::test]
578 async fn test_data_directory_creation() {
579 let mut config = ServerConfig::default();
580 config.server.data_dir = env::temp_dir().join("amaters_test_dir");
581
582 if config.server.data_dir.exists() {
584 fs::remove_dir_all(&config.server.data_dir).ok();
585 }
586
587 let mut server = Server::new(config.clone());
588 server
589 .ensure_data_directory()
590 .expect("Failed to create directory");
591
592 assert!(config.server.data_dir.exists());
593
594 fs::remove_dir_all(&config.server.data_dir).ok();
596 }
597
598 #[tokio::test]
599 async fn test_shutdown_coordinator() {
600 let config = ServerConfig::default();
601 let server = Server::new(config);
602
603 let coordinator = server.shutdown_coordinator();
604 assert!(!coordinator.is_shutting_down());
605
606 coordinator.shutdown();
607 assert!(coordinator.is_shutting_down());
608 }
609}