#![allow(dead_code)]
use std::time::Duration;
use testcontainers::runners::AsyncRunner;
use testcontainers::{ContainerAsync, ImageExt};
pub struct RedisContainer {
container: ContainerAsync<testcontainers_modules::redis::Redis>,
port: u16,
}
impl RedisContainer {
pub async fn start() -> Result<Self, String> {
let redis = testcontainers_modules::redis::Redis::default()
.with_tag("7-alpine")
.start()
.await
.map_err(|e| format!("启动 Redis 容器失败: {}", e))?;
let port = redis
.get_host_port_ipv4(6379)
.await
.map_err(|e| format!("获取端口失败: {}", e))?;
Ok(Self { container: redis, port })
}
pub fn url(&self) -> String {
format!("redis://127.0.0.1:{}", self.port)
}
pub fn port(&self) -> u16 {
self.port
}
pub async fn wait_ready(&self) -> Result<(), String> {
let url = self.url();
let client = redis::Client::open(url.as_str()).map_err(|e| format!("创建客户端失败: {}", e))?;
let start = std::time::Instant::now();
let timeout = Duration::from_secs(30);
while start.elapsed() < timeout {
match client.get_multiplexed_async_connection().await {
Ok(_) => return Ok(()),
Err(_) => tokio::time::sleep(Duration::from_millis(100)).await,
}
}
Err("等待 Redis 就绪超时".to_string())
}
}
pub struct RedisClusterManager {
nodes: Vec<ContainerAsync<testcontainers_modules::redis::Redis>>,
ports: Vec<u16>,
}
impl RedisClusterManager {
pub async fn start_cluster() -> Result<Self, String> {
let mut nodes = Vec::new();
let mut ports = Vec::new();
for i in 0..6 {
let redis = testcontainers_modules::redis::Redis::default()
.with_tag("7-alpine")
.start()
.await
.map_err(|e| format!("启动 Redis Cluster 节点 {} 失败: {}", i, e))?;
let port = redis
.get_host_port_ipv4(6379)
.await
.map_err(|e| format!("获取端口失败: {}", e))?;
nodes.push(redis);
ports.push(port);
}
Ok(Self { nodes, ports })
}
pub fn urls(&self) -> Vec<String> {
self.ports.iter().map(|p| format!("redis://127.0.0.1:{}", p)).collect()
}
pub fn ports(&self) -> &[u16] {
&self.ports
}
}
pub struct TestEnvironment {
redis: Option<RedisContainer>,
}
impl TestEnvironment {
pub fn new() -> Self {
Self { redis: None }
}
pub async fn start_redis(&mut self) -> Result<&RedisContainer, String> {
if self.redis.is_none() {
let container = RedisContainer::start().await?;
container.wait_ready().await?;
self.redis = Some(container);
}
Ok(self.redis.as_ref().unwrap())
}
pub fn redis_url(&self) -> Option<String> {
self.redis.as_ref().map(|r| r.url())
}
}
impl Default for TestEnvironment {
fn default() -> Self {
Self::new()
}
}
pub async fn start_redis_container() -> Result<(RedisContainer, String), String> {
let container = RedisContainer::start().await?;
container.wait_ready().await?;
let url = container.url();
Ok((container, url))
}
pub async fn is_redis_available(url: &str) -> bool {
let client = match redis::Client::open(url) {
Ok(c) => c,
Err(_) => return false,
};
matches!(
tokio::time::timeout(Duration::from_secs(2), client.get_multiplexed_async_connection()).await,
Ok(Ok(_))
)
}