use amaters_sdk_rust::{AmateRSClient, ClientConfig};
use amaters_server::config::{
AuthSettings, AuthorizationSettings, CompactionSettings, LoggingSettings, MetricsSettings,
NetworkSettings, ServerConfig, ServerSettings, StorageSettings, WalSettings,
};
use amaters_server::server::Server;
use amaters_server::shutdown::ShutdownCoordinator;
use std::net::SocketAddr;
use std::path::{Path, PathBuf};
use std::time::Duration;
use tokio::task::JoinHandle;
use tokio::time::sleep;
use uuid::Uuid;
pub fn allocate_e2e_port() -> u32 {
let listener =
std::net::TcpListener::bind("127.0.0.1:0").expect("Failed to bind to ephemeral port");
let port = listener
.local_addr()
.expect("Failed to get local address")
.port() as u32;
drop(listener);
port
}
pub struct E2eTestContext {
server_handle: JoinHandle<()>,
pub client: AmateRSClient,
pub addr: SocketAddr,
pub temp_dir: PathBuf,
shutdown_coordinator: ShutdownCoordinator,
storage_engine: String,
}
impl E2eTestContext {
pub async fn new() -> Result<Self, Box<dyn std::error::Error>> {
Self::with_storage("memory").await
}
pub async fn with_storage(engine: &str) -> Result<Self, Box<dyn std::error::Error>> {
let temp_dir = std::env::temp_dir().join(format!("amaters_e2e_{}", Uuid::new_v4()));
std::fs::create_dir_all(&temp_dir)?;
let port = allocate_e2e_port();
let bind_address = format!("127.0.0.1:{}", port);
let addr: SocketAddr = bind_address.parse()?;
let config = create_e2e_server_config(&temp_dir, port, engine);
let (server_handle, shutdown_coordinator) = start_test_server(config).await?;
let client_addr = format!("http://{}", bind_address);
let client_config = ClientConfig::new(client_addr)
.with_connect_timeout(Duration::from_secs(5))
.with_request_timeout(Duration::from_secs(120));
let mut retry_delay = Duration::from_millis(50);
let max_retries = 10;
let mut client = None;
for attempt in 0..max_retries {
match AmateRSClient::connect_with_config(client_config.clone()).await {
Ok(c) => {
client = Some(c);
break;
}
Err(e) if attempt < max_retries - 1 => {
sleep(retry_delay).await;
retry_delay = retry_delay.saturating_mul(2).min(Duration::from_secs(2));
}
Err(e) => {
return Err(
format!("Failed to connect after {} attempts: {}", max_retries, e).into(),
);
}
}
}
let client = client.ok_or("Failed to establish client connection")?;
Ok(Self {
server_handle,
client,
addr,
temp_dir,
shutdown_coordinator,
storage_engine: engine.to_string(),
})
}
pub async fn create_additional_client(
&self,
) -> Result<AmateRSClient, Box<dyn std::error::Error>> {
let client_addr = format!("http://{}", self.addr);
let client_config = ClientConfig::new(client_addr)
.with_connect_timeout(Duration::from_secs(5))
.with_request_timeout(Duration::from_secs(120));
Ok(AmateRSClient::connect_with_config(client_config).await?)
}
pub async fn restart_server(&mut self) -> Result<(), Box<dyn std::error::Error>> {
self.shutdown_coordinator.shutdown();
let _ = tokio::time::timeout(Duration::from_secs(10), &mut self.server_handle).await;
sleep(Duration::from_millis(200)).await;
let port = self.addr.port() as u32;
let config = create_e2e_server_config(&self.temp_dir, port, &self.storage_engine);
let (server_handle, shutdown_coordinator) = start_test_server(config).await?;
self.server_handle = server_handle;
self.shutdown_coordinator = shutdown_coordinator;
sleep(Duration::from_millis(300)).await;
let client_addr = format!("http://{}", self.addr);
let client_config = ClientConfig::new(client_addr)
.with_connect_timeout(Duration::from_secs(5))
.with_request_timeout(Duration::from_secs(120));
let mut retry_delay = Duration::from_millis(100);
for attempt in 0..10 {
match AmateRSClient::connect_with_config(client_config.clone()).await {
Ok(c) => {
self.client = c;
return Ok(());
}
Err(_) if attempt < 9 => {
sleep(retry_delay).await;
retry_delay = retry_delay.saturating_mul(2).min(Duration::from_secs(2));
}
Err(e) => {
return Err(format!("Failed to reconnect after restart: {}", e).into());
}
}
}
Ok(())
}
pub async fn cleanup(self) {
self.shutdown_coordinator.shutdown();
let _ = tokio::time::timeout(Duration::from_secs(10), self.server_handle).await;
sleep(Duration::from_millis(100)).await;
if self.temp_dir.exists() {
std::fs::remove_dir_all(&self.temp_dir).ok();
}
}
}
pub async fn start_test_server(
config: ServerConfig,
) -> Result<(JoinHandle<()>, ShutdownCoordinator), Box<dyn std::error::Error>> {
let mut server = Server::new(config);
let shutdown_coordinator = server.shutdown_coordinator().clone();
server
.initialize()
.await
.map_err(|e| format!("Failed to initialize server: {}", e))?;
let handle = tokio::spawn(async move {
if let Err(e) = server.start().await {
eprintln!("Server error: {}", e);
}
if let Err(e) = server.shutdown().await {
eprintln!("Server shutdown error: {}", e);
}
});
Ok((handle, shutdown_coordinator))
}
pub fn create_e2e_server_config(temp_dir: &Path, port: u32, engine: &str) -> ServerConfig {
ServerConfig {
server: ServerSettings {
bind_address: format!("127.0.0.1:{}", port),
data_dir: temp_dir.to_path_buf(),
pid_file: temp_dir.join("test.pid"),
max_connections: 100,
shutdown_timeout_secs: 5,
},
storage: StorageSettings {
engine: engine.to_string(),
wal: WalSettings {
enabled: true,
dir: PathBuf::from("wal"),
segment_size_mb: 64,
sync_mode: "interval".to_string(),
},
memtable_size_mb: 16,
block_cache_size_mb: 32,
compaction: CompactionSettings {
strategy: "leveled".to_string(),
num_levels: 7,
level_multiplier: 10,
max_concurrent: 2,
},
},
network: NetworkSettings {
tls_enabled: false,
tls_cert: None,
tls_key: None,
tls_ca: None,
require_client_cert: false,
connection_timeout_secs: 5,
keepalive_interval_secs: 10,
},
cluster: None,
logging: LoggingSettings {
level: "info".to_string(),
format: "compact".to_string(),
file_enabled: false,
file_path: None,
rotation: Default::default(),
},
metrics: MetricsSettings {
enabled: false,
bind_address: format!("127.0.0.1:{}", port + 1000),
export_interval_secs: 60,
},
auth: AuthSettings {
enabled: false,
methods: vec![],
mtls: Default::default(),
jwt: Default::default(),
api_key: Default::default(),
reject_unauthenticated: false,
},
authz: AuthorizationSettings {
enabled: false,
default_role: "admin".to_string(),
roles_file: None,
policies_file: None,
collection_permissions: false,
default_mode: "allow-by-default".to_string(),
audit_enabled: false,
audit_log_path: None,
},
resource_limits: Default::default(),
circuit_cache: Default::default(),
timeouts: Default::default(),
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_e2e_context_creation() {
let ctx = E2eTestContext::new().await;
assert!(ctx.is_ok());
if let Ok(ctx) = ctx {
ctx.cleanup().await;
}
}
#[test]
fn test_port_allocation() {
let port1 = allocate_e2e_port();
let port2 = allocate_e2e_port();
assert_ne!(port1, port2);
assert!(port1 > 1024);
assert!(port2 > 1024);
}
}