1use crate::config::{ConfigResult, ServerConfig};
11use crate::health::{HealthChecker, HealthStatus};
12use crate::metrics::MetricsCollector;
13use crate::service::NetworkService;
14use crate::shutdown::ShutdownCoordinator;
15#[cfg(feature = "cluster")]
16use amaters_cluster::{NodeState, RaftConfig, RaftNode};
17use amaters_core::error::Result as CoreResult;
18use amaters_core::storage::{
19 BlockCacheConfig, CompactionConfig, LsmTreeConfig, LsmTreeStorage, MemoryStorage,
20 MemtableConfig, SSTableConfig,
21};
22use amaters_core::traits::StorageEngine;
23use amaters_core::types::{CipherBlob, Key};
24use async_trait::async_trait;
25use std::fs;
26use std::path::Path;
27use std::sync::Arc;
28use std::time::Duration;
29use thiserror::Error;
30use tokio::time::sleep;
31use tracing::{error, info, warn};
32
33#[derive(Clone)]
35pub enum Storage {
36 Memory(MemoryStorage),
37 Lsm(LsmTreeStorage),
38}
39
40#[async_trait]
41impl StorageEngine for Storage {
42 async fn put(&self, key: &Key, value: &CipherBlob) -> CoreResult<()> {
43 match self {
44 Storage::Memory(s) => s.put(key, value).await,
45 Storage::Lsm(s) => s.put(key, value).await,
46 }
47 }
48
49 async fn get(&self, key: &Key) -> CoreResult<Option<CipherBlob>> {
50 match self {
51 Storage::Memory(s) => s.get(key).await,
52 Storage::Lsm(s) => s.get(key).await,
53 }
54 }
55
56 async fn atomic_update<F>(&self, key: &Key, f: F) -> CoreResult<()>
57 where
58 F: Fn(&CipherBlob) -> CoreResult<CipherBlob> + Send + Sync,
59 {
60 match self {
61 Storage::Memory(s) => s.atomic_update(key, f).await,
62 Storage::Lsm(s) => s.atomic_update(key, f).await,
63 }
64 }
65
66 async fn delete(&self, key: &Key) -> CoreResult<()> {
67 match self {
68 Storage::Memory(s) => s.delete(key).await,
69 Storage::Lsm(s) => s.delete(key).await,
70 }
71 }
72
73 async fn range(&self, start: &Key, end: &Key) -> CoreResult<Vec<(Key, CipherBlob)>> {
74 match self {
75 Storage::Memory(s) => s.range(start, end).await,
76 Storage::Lsm(s) => s.range(start, end).await,
77 }
78 }
79
80 async fn keys(&self) -> CoreResult<Vec<Key>> {
81 match self {
82 Storage::Memory(s) => s.keys().await,
83 Storage::Lsm(s) => s.keys().await,
84 }
85 }
86
87 async fn flush(&self) -> CoreResult<()> {
88 match self {
89 Storage::Memory(s) => s.flush().await,
90 Storage::Lsm(s) => s.flush().await,
91 }
92 }
93
94 async fn close(&self) -> CoreResult<()> {
95 match self {
96 Storage::Memory(s) => s.close().await,
97 Storage::Lsm(s) => s.close().await,
98 }
99 }
100}
101
102#[derive(Error, Debug)]
104pub enum ServerError {
105 #[error("Configuration error: {0}")]
106 Config(String),
107
108 #[error("Configuration validation error: {0}")]
109 ConfigValidation(String),
110
111 #[error("Storage initialization failed: {0}")]
112 Storage(String),
113
114 #[error("Network initialization failed: {0}")]
115 Network(String),
116
117 #[error("Cluster initialization failed: {0}")]
118 Cluster(String),
119
120 #[error("TLS setup failed: {0}")]
121 TlsSetup(String),
122
123 #[error("Server already running")]
124 AlreadyRunning,
125
126 #[error("Failed to create directory: {0}")]
127 DirectoryCreation(#[from] std::io::Error),
128
129 #[error("Shutdown timeout")]
130 ShutdownTimeout,
131
132 #[error("Core error: {0}")]
133 Core(#[from] amaters_core::error::AmateRSError),
134}
135
136pub type ServerResult<T> = Result<T, ServerError>;
137
138impl crate::retry::ErrorClassification for ServerError {
146 fn is_transient(&self) -> bool {
147 match self {
148 ServerError::DirectoryCreation(io_err) => {
149 matches!(
151 io_err.kind(),
152 std::io::ErrorKind::TimedOut
153 | std::io::ErrorKind::Interrupted
154 | std::io::ErrorKind::WouldBlock
155 | std::io::ErrorKind::ConnectionReset
156 | std::io::ErrorKind::ConnectionAborted
157 )
158 }
159 ServerError::Config(_)
162 | ServerError::ConfigValidation(_)
163 | ServerError::Storage(_)
164 | ServerError::Network(_)
165 | ServerError::Cluster(_)
166 | ServerError::TlsSetup(_)
167 | ServerError::AlreadyRunning
168 | ServerError::ShutdownTimeout
169 | ServerError::Core(_) => false,
170 }
171 }
172}
173
174#[cfg(feature = "cluster")]
176#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
177pub struct ClusterStatus {
178 pub node_id: u64,
180 pub state: String,
182 pub current_term: u64,
184 pub leader_id: Option<u64>,
186 pub is_leader: bool,
188 pub commit_index: u64,
190 pub last_log_index: u64,
192}
193
194pub struct Server {
196 config: Arc<ServerConfig>,
198 storage: Option<Arc<Storage>>,
200 network: Option<NetworkService>,
202 #[cfg(feature = "cluster")]
204 cluster_node: Option<Arc<RaftNode>>,
205 shutdown: ShutdownCoordinator,
207 health: HealthChecker,
209 metrics: MetricsCollector,
211}
212
213impl Server {
214 pub fn new(config: ServerConfig) -> Self {
216 Self {
217 config: Arc::new(config),
218 storage: None,
219 network: None,
220 #[cfg(feature = "cluster")]
221 cluster_node: None,
222 shutdown: ShutdownCoordinator::new(),
223 health: HealthChecker::new(),
224 metrics: MetricsCollector::new(),
225 }
226 }
227
228 pub async fn initialize(&mut self) -> ServerResult<()> {
230 info!("Initializing server components");
231
232 self.ensure_data_directory()?;
234
235 self.initialize_storage().await?;
237
238 self.initialize_network().await?;
240
241 #[cfg(feature = "cluster")]
243 self.initialize_cluster()?;
244
245 self.health.set_status(HealthStatus::Starting);
247
248 info!("Server components initialized successfully");
249 Ok(())
250 }
251
252 fn ensure_data_directory(&self) -> ServerResult<()> {
254 let data_dir = &self.config.server.data_dir;
255 if !data_dir.exists() {
256 info!("Creating data directory: {}", data_dir.display());
257 fs::create_dir_all(data_dir)?;
258 }
259 Ok(())
260 }
261
262 async fn initialize_storage(&mut self) -> ServerResult<()> {
264 info!(
265 "Initializing storage engine: {}",
266 self.config.storage.engine
267 );
268
269 let storage = match self.config.storage.engine.as_str() {
270 "memory" => {
271 info!("Using in-memory storage engine");
272 Storage::Memory(MemoryStorage::new())
273 }
274 "lsm" => {
275 info!("Using LSM-Tree storage engine");
276 let lsm_config = self.build_lsm_config()?;
277 let lsm_storage = LsmTreeStorage::with_config(lsm_config).map_err(|e| {
278 ServerError::Storage(format!("Failed to create LSM storage: {}", e))
279 })?;
280 Storage::Lsm(lsm_storage)
281 }
282 other => {
283 return Err(ServerError::Config(format!(
284 "Invalid storage engine: {}. Supported: memory, lsm",
285 other
286 )));
287 }
288 };
289
290 self.storage = Some(Arc::new(storage));
291 self.health.set_storage_healthy(true);
292
293 info!("Storage engine initialized successfully");
294 Ok(())
295 }
296
297 fn build_lsm_config(&self) -> ServerResult<LsmTreeConfig> {
299 let data_dir = self.config.server.data_dir.join("lsm");
300 let wal_dir = self
301 .config
302 .server
303 .data_dir
304 .join(self.config.storage.wal.dir.clone());
305
306 std::fs::create_dir_all(&data_dir).map_err(|e| {
308 ServerError::Storage(format!("Failed to create LSM data directory: {}", e))
309 })?;
310 std::fs::create_dir_all(&wal_dir)
311 .map_err(|e| ServerError::Storage(format!("Failed to create WAL directory: {}", e)))?;
312
313 let memtable_config = MemtableConfig {
314 max_size_bytes: self.config.storage.memtable_size_mb * 1024 * 1024,
315 enable_wal: self.config.storage.wal.enabled,
316 };
317
318 let sstable_config = SSTableConfig {
319 block_size: 4096,
320 compression_type: amaters_core::storage::CompressionType::Lz4,
321 };
322
323 let block_cache_config = BlockCacheConfig {
324 max_size_bytes: self.config.storage.block_cache_size_mb * 1024 * 1024,
325 enable_stats: true,
326 };
327
328 let compaction_config = CompactionConfig {
329 strategy: match self.config.storage.compaction.strategy.as_str() {
330 "tiered" => amaters_core::storage::CompactionStrategy::SizeTiered,
331 _ => amaters_core::storage::CompactionStrategy::LevelBased,
332 },
333 l0_threshold: 4,
334 level_multiplier: self.config.storage.compaction.level_multiplier,
335 base_level_size: 10 * 1024 * 1024, max_compaction_bytes: 100 * 1024 * 1024, ..Default::default()
338 };
339
340 let value_log_config = None; Ok(LsmTreeConfig {
344 data_dir,
345 wal_dir,
346 memtable_config,
347 sstable_config,
348 block_cache_config,
349 compaction_config,
350 value_log_config,
351 max_levels: self.config.storage.compaction.num_levels,
352 l0_compaction_threshold: 4,
353 level_size_multiplier: self.config.storage.compaction.level_multiplier,
354 })
355 }
356
357 async fn initialize_network(&mut self) -> ServerResult<()> {
359 info!("Initializing network service");
360
361 let storage = self
362 .storage
363 .as_ref()
364 .ok_or_else(|| ServerError::Config("Storage not initialized".to_string()))?
365 .clone();
366
367 let network = NetworkService::new(
368 storage,
369 self.config.clone(),
370 self.health.clone(),
371 self.metrics.clone(),
372 self.shutdown.clone(),
373 );
374
375 self.network = Some(network);
376 self.health.set_network_healthy(true);
377
378 info!("Network service initialized successfully");
379 Ok(())
380 }
381
382 #[cfg(feature = "cluster")]
384 fn initialize_cluster(&mut self) -> ServerResult<()> {
385 let cluster_settings = match self.config.cluster.as_ref() {
386 Some(settings) if settings.enabled => settings,
387 _ => {
388 info!("Cluster mode disabled, running as standalone node");
389 return Ok(());
390 }
391 };
392
393 info!(
394 "Initializing cluster node (node_id: {}, peers: {:?})",
395 cluster_settings.node_id, cluster_settings.peers
396 );
397
398 let mut peer_ids: Vec<u64> = Vec::new();
401 for peer_str in &cluster_settings.peers {
402 let parts: Vec<&str> = peer_str.splitn(2, ':').collect();
403 if parts.is_empty() {
404 return Err(ServerError::Cluster(format!(
405 "Invalid peer format '{}', expected 'node_id:address'",
406 peer_str
407 )));
408 }
409 let peer_id: u64 = parts[0].parse().map_err(|e| {
410 ServerError::Cluster(format!("Invalid peer node_id in '{}': {}", peer_str, e))
411 })?;
412 peer_ids.push(peer_id);
413 }
414
415 if !peer_ids.contains(&cluster_settings.node_id) {
417 peer_ids.push(cluster_settings.node_id);
418 }
419
420 let mut raft_config = RaftConfig::new(cluster_settings.node_id, peer_ids);
422 raft_config.election_timeout_range = (
423 cluster_settings
424 .election_timeout_ms
425 .saturating_sub(cluster_settings.election_timeout_ms / 3),
426 cluster_settings
427 .election_timeout_ms
428 .saturating_add(cluster_settings.election_timeout_ms / 3),
429 );
430 raft_config.heartbeat_interval = cluster_settings.heartbeat_interval_ms;
431
432 let node = RaftNode::new(raft_config)
433 .map_err(|e| ServerError::Cluster(format!("Failed to create RaftNode: {}", e)))?;
434
435 self.cluster_node = Some(Arc::new(node));
436 self.health.set_cluster_enabled(true);
437 self.health.set_cluster_healthy(true);
438
439 info!(
440 "Cluster node initialized (node_id: {}, state: {})",
441 cluster_settings.node_id, "Follower"
442 );
443
444 Ok(())
445 }
446
447 #[cfg(feature = "cluster")]
449 pub fn cluster_node(&self) -> Option<&Arc<RaftNode>> {
450 self.cluster_node.as_ref()
451 }
452
453 #[cfg(feature = "cluster")]
455 pub fn cluster_status(&self) -> Option<ClusterStatus> {
456 self.cluster_node.as_ref().map(|node| ClusterStatus {
457 node_id: node.node_id(),
458 state: node.state().as_str().to_string(),
459 current_term: node.current_term(),
460 leader_id: node.leader_id(),
461 is_leader: node.is_leader(),
462 commit_index: node.commit_index(),
463 last_log_index: node.last_log_index(),
464 })
465 }
466
467 pub async fn start(&mut self) -> ServerResult<()> {
469 info!("Starting AmateRS server v{}", env!("CARGO_PKG_VERSION"));
470 info!("Bind address: {}", self.config.server.bind_address);
471 info!("Data directory: {}", self.config.server.data_dir.display());
472
473 #[cfg(feature = "cluster")]
475 if let Some(ref node) = self.cluster_node {
476 info!(
477 "Cluster mode: enabled (node_id: {}, state: {}, term: {})",
478 node.node_id(),
479 node.state().as_str(),
480 node.current_term()
481 );
482 } else {
483 info!("Cluster mode: disabled (standalone)");
484 }
485
486 if let Some(ref mut network) = self.network {
488 network.start().await?;
489 }
490
491 self.health.set_status(HealthStatus::Healthy);
493 self.health.set_network_healthy(true);
494
495 info!("Server started successfully");
496 info!("Press Ctrl+C to shutdown");
497
498 let mut shutdown_rx = self.shutdown.subscribe();
500 shutdown_rx
501 .recv()
502 .await
503 .map_err(|e| ServerError::Network(format!("Shutdown channel error: {}", e)))?;
504
505 info!("Shutdown signal received");
506 Ok(())
507 }
508
509 pub async fn shutdown(&mut self) -> ServerResult<()> {
511 info!("Shutting down server gracefully");
512 self.health.set_status(HealthStatus::ShuttingDown);
513
514 let shutdown_timeout = self.config.shutdown_timeout();
515
516 match tokio::time::timeout(shutdown_timeout, self.shutdown_internal()).await {
518 Ok(result) => result,
519 Err(_) => {
520 error!("Shutdown timeout exceeded");
521 Err(ServerError::ShutdownTimeout)
522 }
523 }
524 }
525
526 async fn shutdown_internal(&mut self) -> ServerResult<()> {
528 info!("Stopping new connections");
530 self.health.set_network_healthy(false);
531
532 if let Some(ref mut network) = self.network {
534 network.stop().await?;
535 }
536
537 let max_wait = Duration::from_secs(5);
539 let start = std::time::Instant::now();
540 while self.metrics.snapshot().active_connections > 0 && start.elapsed() < max_wait {
541 info!(
542 "Waiting for {} active connections to drain",
543 self.metrics.snapshot().active_connections
544 );
545 sleep(Duration::from_millis(100)).await;
546 }
547
548 if let Some(ref storage) = self.storage {
550 info!("Flushing storage");
551 storage
552 .flush()
553 .await
554 .map_err(|e| ServerError::Storage(format!("Failed to flush storage: {}", e)))?;
555 }
556
557 if let Some(ref storage) = self.storage {
559 info!("Closing storage");
560 storage
561 .close()
562 .await
563 .map_err(|e| ServerError::Storage(format!("Failed to close storage: {}", e)))?;
564 }
565
566 self.health.set_storage_healthy(false);
567
568 info!("Server shutdown complete");
569 Ok(())
570 }
571
572 pub fn shutdown_coordinator(&self) -> &ShutdownCoordinator {
574 &self.shutdown
575 }
576
577 pub fn health_checker(&self) -> &HealthChecker {
579 &self.health
580 }
581
582 pub fn metrics_collector(&self) -> &MetricsCollector {
584 &self.metrics
585 }
586
587 pub fn config(&self) -> &ServerConfig {
589 &self.config
590 }
591
592 pub fn is_running(config: &ServerConfig) -> bool {
594 let pid_file = &config.server.pid_file;
595 if !pid_file.exists() {
596 return false;
597 }
598
599 if let Ok(contents) = fs::read_to_string(pid_file) {
601 if let Ok(pid) = contents.trim().parse::<i32>() {
602 #[cfg(unix)]
604 {
605 use std::process::Command;
606 let output = Command::new("kill").arg("-0").arg(pid.to_string()).output();
607 if let Ok(output) = output {
608 return output.status.success();
609 }
610 }
611 #[cfg(not(unix))]
612 {
613 return true;
615 }
616 }
617 }
618
619 false
620 }
621
622 pub fn write_pid_file(config: &ServerConfig) -> ServerResult<()> {
624 let pid = std::process::id();
625 let pid_file = &config.server.pid_file;
626
627 if let Some(parent) = pid_file.parent() {
629 fs::create_dir_all(parent)?;
630 }
631
632 fs::write(pid_file, pid.to_string())?;
633 info!("PID file written: {} (pid: {})", pid_file.display(), pid);
634 Ok(())
635 }
636
637 pub fn remove_pid_file(config: &ServerConfig) -> ServerResult<()> {
639 let pid_file = &config.server.pid_file;
640 if pid_file.exists() {
641 fs::remove_file(pid_file)?;
642 info!("PID file removed: {}", pid_file.display());
643 }
644 Ok(())
645 }
646
647 #[cfg(unix)]
649 pub fn stop_server(config: &ServerConfig, force: bool) -> ServerResult<()> {
650 let pid_file = &config.server.pid_file;
651
652 if !pid_file.exists() {
653 warn!("PID file not found - server may not be running");
654 return Ok(());
655 }
656
657 let contents = fs::read_to_string(pid_file)?;
658 let pid = contents
659 .trim()
660 .parse::<i32>()
661 .map_err(|e| ServerError::Config(format!("Invalid PID in file: {}", e)))?;
662
663 let signal = if force { "SIGKILL" } else { "SIGTERM" };
664 info!("Sending {} to process {}", signal, pid);
665
666 use std::process::Command;
667 let signal_arg = if force { "-9" } else { "-15" };
668
669 let output = Command::new("kill")
670 .arg(signal_arg)
671 .arg(pid.to_string())
672 .output()?;
673
674 if !output.status.success() {
675 let stderr = String::from_utf8_lossy(&output.stderr);
676 return Err(ServerError::Network(format!(
677 "Failed to stop server: {}",
678 stderr
679 )));
680 }
681
682 info!("Stop signal sent successfully");
683 Ok(())
684 }
685
686 #[cfg(not(unix))]
687 pub fn stop_server(_config: &ServerConfig, _force: bool) -> ServerResult<()> {
688 Err(ServerError::Config(
689 "Stop command is not supported on this platform".to_string(),
690 ))
691 }
692}
693
694#[cfg(test)]
695mod tests {
696 use super::*;
697 use std::env;
698
699 #[tokio::test]
700 async fn test_server_creation() {
701 let config = ServerConfig::default();
702 let server = Server::new(config);
703
704 assert_eq!(server.health_checker().status(), HealthStatus::Starting);
705 }
706
707 #[tokio::test]
708 async fn test_server_initialization() {
709 let mut config = ServerConfig::default();
710 config.server.data_dir = env::temp_dir().join("amaters_test_init");
711 config.storage.engine = "memory".to_string();
712
713 let mut server = Server::new(config);
714 let result = server.initialize().await;
715
716 assert!(result.is_ok());
717 assert!(server.storage.is_some());
718
719 if server.config.server.data_dir.exists() {
721 fs::remove_dir_all(&server.config.server.data_dir).ok();
722 }
723 }
724
725 #[tokio::test]
726 async fn test_lsm_initialization() {
727 let mut config = ServerConfig::default();
728 config.server.data_dir = env::temp_dir().join("amaters_test_lsm");
729 config.storage.engine = "lsm".to_string();
730
731 let mut server = Server::new(config);
732 let result = server.initialize().await;
733
734 assert!(result.is_ok());
735 assert!(server.storage.is_some());
736
737 if server.config.server.data_dir.exists() {
739 fs::remove_dir_all(&server.config.server.data_dir).ok();
740 }
741 }
742
743 #[tokio::test]
744 async fn test_data_directory_creation() {
745 let mut config = ServerConfig::default();
746 config.server.data_dir = env::temp_dir().join("amaters_test_dir");
747
748 if config.server.data_dir.exists() {
750 fs::remove_dir_all(&config.server.data_dir).ok();
751 }
752
753 let mut server = Server::new(config.clone());
754 server
755 .ensure_data_directory()
756 .expect("Failed to create directory");
757
758 assert!(config.server.data_dir.exists());
759
760 fs::remove_dir_all(&config.server.data_dir).ok();
762 }
763
764 #[tokio::test]
765 async fn test_shutdown_coordinator() {
766 let config = ServerConfig::default();
767 let server = Server::new(config);
768
769 let coordinator = server.shutdown_coordinator();
770 assert!(!coordinator.is_shutting_down());
771
772 coordinator.shutdown();
773 assert!(coordinator.is_shutting_down());
774 }
775
776 #[tokio::test]
778 async fn test_server_creation_without_cluster() {
779 let config = ServerConfig::default();
780 assert!(config.cluster.is_none());
781
782 let server = Server::new(config);
783 #[cfg(feature = "cluster")]
784 assert!(server.cluster_node.is_none());
785 assert_eq!(server.health_checker().status(), HealthStatus::Starting);
786 }
787
788 #[cfg(feature = "cluster")]
790 #[tokio::test]
791 async fn test_server_creation_with_cluster_config() {
792 use crate::config::ClusterSettings;
793
794 let mut config = ServerConfig::default();
795 config.server.data_dir = env::temp_dir().join("amaters_test_cluster");
796 config.storage.engine = "memory".to_string();
797 config.cluster = Some(ClusterSettings {
798 enabled: true,
799 node_id: 1,
800 peers: vec![
801 "1:127.0.0.1:7879".to_string(),
802 "2:127.0.0.1:7880".to_string(),
803 "3:127.0.0.1:7881".to_string(),
804 ],
805 heartbeat_interval_ms: 50,
806 election_timeout_ms: 300,
807 });
808
809 let mut server = Server::new(config);
810 let result = server.initialize().await;
811 assert!(result.is_ok());
812
813 assert!(server.cluster_node.is_some());
815
816 let status = server.cluster_status();
818 assert!(status.is_some());
819 let status = status.expect("cluster status should exist");
820 assert_eq!(status.node_id, 1);
821 assert_eq!(status.state, "Follower");
822 assert_eq!(status.current_term, 0);
823 assert!(!status.is_leader);
824
825 let health = server.health_checker().get_health();
827 let cluster_component = health
828 .components
829 .iter()
830 .find(|c| c.name == "cluster")
831 .expect("cluster component should exist");
832 assert_eq!(cluster_component.status, HealthStatus::Healthy);
833
834 if server.config.server.data_dir.exists() {
836 fs::remove_dir_all(&server.config.server.data_dir).ok();
837 }
838 }
839
840 #[test]
842 fn test_cluster_config_defaults() {
843 let config = ServerConfig::default();
844 assert!(config.cluster.is_none());
846 }
847
848 #[test]
850 fn test_cluster_config_validation() {
851 use crate::config::ClusterSettings;
852
853 let config = ServerConfig {
854 cluster: Some(ClusterSettings {
855 enabled: true,
856 node_id: 1,
857 peers: Vec::new(), heartbeat_interval_ms: 50,
859 election_timeout_ms: 300,
860 }),
861 ..Default::default()
862 };
863
864 let result = config.validate();
865 assert!(result.is_err());
866 }
867
868 #[cfg(feature = "cluster")]
870 #[tokio::test]
871 async fn test_server_cluster_disabled_explicitly() {
872 use crate::config::ClusterSettings;
873
874 let mut config = ServerConfig::default();
875 config.server.data_dir = env::temp_dir().join("amaters_test_cluster_disabled");
876 config.storage.engine = "memory".to_string();
877 config.cluster = Some(ClusterSettings {
878 enabled: false,
879 node_id: 1,
880 peers: Vec::new(),
881 heartbeat_interval_ms: 50,
882 election_timeout_ms: 300,
883 });
884
885 let mut server = Server::new(config);
886 let result = server.initialize().await;
887 assert!(result.is_ok());
888
889 assert!(server.cluster_node.is_none());
891 assert!(server.cluster_status().is_none());
892
893 if server.config.server.data_dir.exists() {
895 fs::remove_dir_all(&server.config.server.data_dir).ok();
896 }
897 }
898}