#![cfg(feature = "tokio")]
use std::fmt;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::time::{Duration, Instant};
use tokio::sync::{Mutex, Notify};
use tokio::time::timeout;
use crate::async_node::{AsyncNode, async_block_for_result_with_timeout};
use crate::{NodeOpt, Opts, Value, VoltError, VoltTable};
#[cfg(feature = "tracing")]
macro_rules! async_pool_trace {
($($arg:tt)*) => { tracing::trace!($($arg)*) };
}
#[cfg(not(feature = "tracing"))]
macro_rules! async_pool_trace {
($($arg:tt)*) => {};
}
#[cfg(feature = "tracing")]
macro_rules! async_pool_debug {
($($arg:tt)*) => { tracing::debug!($($arg)*) };
}
#[cfg(not(feature = "tracing"))]
macro_rules! async_pool_debug {
($($arg:tt)*) => {};
}
#[cfg(feature = "tracing")]
macro_rules! async_pool_info {
($($arg:tt)*) => { tracing::info!($($arg)*) };
}
#[cfg(not(feature = "tracing"))]
macro_rules! async_pool_info {
($($arg:tt)*) => {};
}
#[cfg(feature = "tracing")]
macro_rules! async_pool_warn {
($($arg:tt)*) => { tracing::warn!($($arg)*) };
}
#[cfg(not(feature = "tracing"))]
macro_rules! async_pool_warn {
($($arg:tt)*) => {};
}
#[cfg(feature = "tracing")]
macro_rules! async_pool_error {
($($arg:tt)*) => { tracing::error!($($arg)*) };
}
#[cfg(not(feature = "tracing"))]
macro_rules! async_pool_error {
($($arg:tt)*) => {};
}
#[cfg(feature = "metrics")]
mod async_pool_metrics {
use metrics::{counter, gauge};
pub fn set_connections_total(count: usize) {
gauge!("voltdb_async_pool_connections_total").set(count as f64);
}
pub fn set_connections_healthy(count: usize) {
gauge!("voltdb_async_pool_connections_healthy").set(count as f64);
}
pub fn inc_reconnect_total() {
counter!("voltdb_async_pool_reconnect_total").increment(1);
}
pub fn inc_circuit_open_total() {
counter!("voltdb_async_pool_circuit_open_total").increment(1);
}
pub fn inc_requests_failed_total() {
counter!("voltdb_async_pool_requests_failed_total").increment(1);
}
pub fn inc_requests_total() {
counter!("voltdb_async_pool_requests_total").increment(1);
}
}
#[cfg(not(feature = "metrics"))]
mod async_pool_metrics {
pub fn set_connections_total(_count: usize) {}
pub fn set_connections_healthy(_count: usize) {}
pub fn inc_reconnect_total() {}
pub fn inc_circuit_open_total() {}
pub fn inc_requests_failed_total() {}
pub fn inc_requests_total() {}
}
#[derive(Debug, Clone)]
pub enum ConnState {
Healthy,
Unhealthy { since: Instant },
Reconnecting,
}
impl ConnState {
fn is_healthy(&self) -> bool {
matches!(self, ConnState::Healthy)
}
fn is_reconnecting(&self) -> bool {
matches!(self, ConnState::Reconnecting)
}
}
#[derive(Debug, Clone)]
pub enum Circuit {
Closed,
Open { until: Instant },
HalfOpen,
}
impl Circuit {
fn should_allow(&self) -> bool {
match self {
Circuit::Closed => true,
Circuit::Open { until } => Instant::now() >= *until,
Circuit::HalfOpen => true,
}
}
fn open(&mut self, duration: Duration) {
*self = Circuit::Open {
until: Instant::now() + duration,
};
async_pool_metrics::inc_circuit_open_total();
async_pool_warn!("circuit breaker opened");
}
#[allow(dead_code)]
fn half_open(&mut self) {
*self = Circuit::HalfOpen;
async_pool_debug!("circuit breaker half-open");
}
fn close(&mut self) {
*self = Circuit::Closed;
async_pool_info!("circuit breaker closed");
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum ExhaustionPolicy {
#[default]
FailFast,
Block { timeout: Duration },
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum ValidationMode {
#[default]
FailFast,
BestEffort,
}
#[derive(Debug, Clone)]
pub struct AsyncPoolConfig {
pub size: usize,
pub reconnect_backoff: Duration,
pub circuit_open_duration: Duration,
pub exhaustion_policy: ExhaustionPolicy,
pub validation_mode: ValidationMode,
pub circuit_failure_threshold: u32,
pub shutdown_timeout: Duration,
pub request_timeout: Duration,
}
impl Default for AsyncPoolConfig {
fn default() -> Self {
Self {
size: 10,
reconnect_backoff: Duration::from_secs(5),
circuit_open_duration: Duration::from_secs(30),
exhaustion_policy: ExhaustionPolicy::FailFast,
validation_mode: ValidationMode::FailFast,
circuit_failure_threshold: 3,
shutdown_timeout: Duration::from_secs(30),
request_timeout: Duration::from_secs(30),
}
}
}
impl AsyncPoolConfig {
pub fn new() -> Self {
Self::default()
}
pub fn size(mut self, size: usize) -> Self {
self.size = size;
self
}
pub fn reconnect_backoff(mut self, duration: Duration) -> Self {
self.reconnect_backoff = duration;
self
}
pub fn circuit_open_duration(mut self, duration: Duration) -> Self {
self.circuit_open_duration = duration;
self
}
pub fn exhaustion_policy(mut self, policy: ExhaustionPolicy) -> Self {
self.exhaustion_policy = policy;
self
}
pub fn validation_mode(mut self, mode: ValidationMode) -> Self {
self.validation_mode = mode;
self
}
pub fn circuit_failure_threshold(mut self, threshold: u32) -> Self {
self.circuit_failure_threshold = threshold;
self
}
pub fn shutdown_timeout(mut self, timeout: Duration) -> Self {
self.shutdown_timeout = timeout;
self
}
pub fn request_timeout(mut self, duration: Duration) -> Self {
self.request_timeout = duration;
self
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum PoolPhase {
Running,
Shutdown,
}
#[derive(Debug)]
struct SlotMeta {
state: ConnState,
circuit: Circuit,
consecutive_failures: u32,
last_reconnect_attempt: Option<Instant>,
host_idx: usize,
}
impl SlotMeta {
fn new_healthy(host_idx: usize) -> Self {
Self {
state: ConnState::Healthy,
circuit: Circuit::Closed,
consecutive_failures: 0,
last_reconnect_attempt: None,
host_idx,
}
}
fn new_unhealthy(host_idx: usize) -> Self {
Self {
state: ConnState::Unhealthy {
since: Instant::now(),
},
circuit: Circuit::Open {
until: Instant::now() + Duration::from_secs(5),
},
consecutive_failures: 1,
last_reconnect_attempt: None,
host_idx,
}
}
fn is_available(&self) -> bool {
self.state.is_healthy() && self.circuit.should_allow()
}
fn needs_reconnect(&self, backoff: Duration) -> bool {
if self.state.is_healthy() || self.state.is_reconnecting() {
return false;
}
match self.last_reconnect_attempt {
None => true,
Some(last) => Instant::now().duration_since(last) >= backoff,
}
}
fn record_success(&mut self) {
self.consecutive_failures = 0;
self.state = ConnState::Healthy;
self.circuit.close();
}
fn record_failure(&mut self, config: &AsyncPoolConfig) {
self.consecutive_failures += 1;
self.state = ConnState::Unhealthy {
since: Instant::now(),
};
if self.consecutive_failures >= config.circuit_failure_threshold {
self.circuit.open(config.circuit_open_duration);
}
}
}
struct AsyncInnerPool {
opts: Opts,
config: AsyncPoolConfig,
slots: Vec<SlotMeta>,
nodes: Vec<Arc<Mutex<Option<AsyncNode>>>>,
phase: PoolPhase,
}
impl fmt::Debug for AsyncInnerPool {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("AsyncInnerPool")
.field("config", &self.config)
.field("slots_count", &self.slots.len())
.field("phase", &self.phase)
.finish()
}
}
impl AsyncInnerPool {
fn node_opt(&self, host_idx: usize) -> Result<NodeOpt, VoltError> {
let ip_port = self
.opts
.0
.ip_ports
.get(host_idx)
.cloned()
.ok_or(VoltError::InvalidConfig)?;
Ok(NodeOpt {
ip_port,
pass: self.opts.0.pass.clone(),
user: self.opts.0.user.clone(),
connect_timeout: self.opts.0.connect_timeout,
read_timeout: self.opts.0.read_timeout,
})
}
async fn new(opts: Opts, config: AsyncPoolConfig) -> Result<Self, VoltError> {
let num_hosts = opts.0.ip_ports.len();
let mut inner = AsyncInnerPool {
opts,
config: config.clone(),
slots: Vec::with_capacity(config.size),
nodes: Vec::with_capacity(config.size),
phase: PoolPhase::Running,
};
for i in 0..config.size {
let host_idx = i % num_hosts;
let node_opt = inner.node_opt(host_idx)?;
async_pool_debug!(slot = i, host = host_idx, "creating connection");
match AsyncNode::new(node_opt).await {
Ok(node) => {
inner.slots.push(SlotMeta::new_healthy(host_idx));
inner.nodes.push(Arc::new(Mutex::new(Some(node))));
async_pool_info!(slot = i, "connection established");
}
Err(e) => match config.validation_mode {
ValidationMode::FailFast => {
async_pool_error!(slot = i, error = ?e, "connection failed, aborting pool creation");
return Err(e);
}
ValidationMode::BestEffort => {
async_pool_warn!(slot = i, error = ?e, "connection failed, marking unhealthy");
inner.slots.push(SlotMeta::new_unhealthy(host_idx));
inner.nodes.push(Arc::new(Mutex::new(None)));
}
},
}
}
inner.update_metrics();
async_pool_info!(
size = config.size,
healthy = inner.healthy_count(),
"async pool initialized"
);
Ok(inner)
}
fn healthy_count(&self) -> usize {
self.slots.iter().filter(|s| s.state.is_healthy()).count()
}
fn update_metrics(&self) {
async_pool_metrics::set_connections_total(self.slots.len());
async_pool_metrics::set_connections_healthy(self.healthy_count());
}
}
pub struct AsyncPool {
inner: Arc<Mutex<AsyncInnerPool>>,
notify: Arc<Notify>,
counter: AtomicUsize,
shutdown_flag: AtomicBool,
config: AsyncPoolConfig,
}
impl fmt::Debug for AsyncPool {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("AsyncPool")
.field("counter", &self.counter.load(Ordering::Relaxed))
.field("shutdown", &self.shutdown_flag.load(Ordering::Relaxed))
.field("config", &self.config)
.finish()
}
}
impl AsyncPool {
pub async fn new<T: Into<Opts>>(opts: T) -> Result<AsyncPool, VoltError> {
AsyncPool::with_config(opts, AsyncPoolConfig::default()).await
}
pub async fn new_manual<T: Into<Opts>>(size: usize, opts: T) -> Result<AsyncPool, VoltError> {
AsyncPool::with_config(opts, AsyncPoolConfig::new().size(size)).await
}
pub async fn with_config<T: Into<Opts>>(
opts: T,
config: AsyncPoolConfig,
) -> Result<AsyncPool, VoltError> {
let inner = AsyncInnerPool::new(opts.into(), config.clone()).await?;
Ok(AsyncPool {
inner: Arc::new(Mutex::new(inner)),
notify: Arc::new(Notify::new()),
counter: AtomicUsize::new(0),
shutdown_flag: AtomicBool::new(false),
config,
})
}
pub async fn get_conn(&self) -> Result<AsyncPooledConn<'_>, VoltError> {
if self.shutdown_flag.load(Ordering::Relaxed) {
async_pool_warn!("get_conn called on shutdown pool");
return Err(VoltError::PoolShutdown);
}
async_pool_metrics::inc_requests_total();
let preferred_idx = self.counter.fetch_add(1, Ordering::Relaxed) % self.config.size;
match self.config.exhaustion_policy {
ExhaustionPolicy::FailFast => self.get_conn_failfast(preferred_idx).await,
ExhaustionPolicy::Block {
timeout: wait_timeout,
} => self.get_conn_blocking(preferred_idx, wait_timeout).await,
}
}
async fn get_conn_failfast(
&self,
preferred_idx: usize,
) -> Result<AsyncPooledConn<'_>, VoltError> {
let inner = self.inner.lock().await;
if inner.phase != PoolPhase::Running {
return Err(VoltError::PoolShutdown);
}
if inner.slots[preferred_idx].is_available() {
return self.checkout_slot(&inner, preferred_idx).await;
}
for i in 1..self.config.size {
let idx = (preferred_idx + i) % self.config.size;
if inner.slots[idx].is_available() {
async_pool_debug!(
preferred = preferred_idx,
actual = idx,
"using alternate connection"
);
return self.checkout_slot(&inner, idx).await;
}
}
async_pool_warn!("no healthy connections available");
async_pool_metrics::inc_requests_failed_total();
Err(VoltError::PoolExhausted)
}
async fn get_conn_blocking(
&self,
preferred_idx: usize,
wait_timeout: Duration,
) -> Result<AsyncPooledConn<'_>, VoltError> {
let deadline = Instant::now() + wait_timeout;
loop {
let inner = self.inner.lock().await;
if inner.phase != PoolPhase::Running {
return Err(VoltError::PoolShutdown);
}
if inner.slots[preferred_idx].is_available() {
return self.checkout_slot(&inner, preferred_idx).await;
}
for i in 1..self.config.size {
let idx = (preferred_idx + i) % self.config.size;
if inner.slots[idx].is_available() {
return self.checkout_slot(&inner, idx).await;
}
}
let remaining = deadline.saturating_duration_since(Instant::now());
if remaining.is_zero() {
async_pool_warn!(timeout = ?wait_timeout, "connection wait timed out");
async_pool_metrics::inc_requests_failed_total();
return Err(VoltError::Timeout);
}
async_pool_trace!("waiting for available connection");
drop(inner);
let _ = timeout(remaining, self.notify.notified()).await;
}
}
async fn checkout_slot(
&self,
inner: &AsyncInnerPool,
idx: usize,
) -> Result<AsyncPooledConn<'_>, VoltError> {
let node = Arc::clone(&inner.nodes[idx]);
let config = inner.config.clone();
let host_idx = inner.slots[idx].host_idx;
async_pool_trace!(slot = idx, "connection acquired");
Ok(AsyncPooledConn {
pool: self,
idx,
node,
config,
host_idx,
})
}
async fn report_fatal_error(&self, idx: usize) {
#[allow(clippy::type_complexity)]
let reconnect_info: Option<(
Arc<Mutex<Option<AsyncNode>>>,
NodeOpt,
AsyncPoolConfig,
)>;
{
let mut inner = self.inner.lock().await;
let config = inner.config.clone();
inner.slots[idx].record_failure(&config);
async_pool_debug!(slot = idx, "fatal error reported");
inner.update_metrics();
self.notify.notify_waiters();
if !self.shutdown_flag.load(Ordering::Relaxed) {
let backoff = inner.config.reconnect_backoff;
if inner.slots[idx].needs_reconnect(backoff) {
let node_arc = Arc::clone(&inner.nodes[idx]);
let host_idx = inner.slots[idx].host_idx;
if let Ok(node_opt) = inner.node_opt(host_idx) {
inner.slots[idx].state = ConnState::Reconnecting;
inner.slots[idx].last_reconnect_attempt = Some(Instant::now());
reconnect_info = Some((node_arc, node_opt, config));
} else {
reconnect_info = None;
}
} else {
reconnect_info = None;
}
} else {
reconnect_info = None;
}
}
if let Some((node_arc, node_opt, config)) = reconnect_info {
self.do_reconnect(idx, node_arc, node_opt, config).await;
}
}
async fn do_reconnect(
&self,
idx: usize,
node_arc: Arc<Mutex<Option<AsyncNode>>>,
node_opt: NodeOpt,
config: AsyncPoolConfig,
) {
async_pool_info!(slot = idx, "attempting reconnection");
async_pool_metrics::inc_reconnect_total();
match AsyncNode::new(node_opt).await {
Ok(new_node) => {
{
let mut node_guard = node_arc.lock().await;
*node_guard = Some(new_node);
}
{
let mut inner = self.inner.lock().await;
inner.slots[idx].record_success();
inner.update_metrics();
}
self.notify.notify_waiters();
async_pool_info!(slot = idx, "reconnection successful");
}
Err(_e) => {
{
let mut inner = self.inner.lock().await;
inner.slots[idx].record_failure(&config);
inner.update_metrics();
}
async_pool_error!(slot = idx, error = ?_e, "reconnection failed");
}
}
}
async fn mark_success(&self, idx: usize) {
let mut inner = self.inner.lock().await;
inner.slots[idx].record_success();
inner.update_metrics();
}
pub async fn shutdown(&self) {
async_pool_info!("initiating pool shutdown");
self.shutdown_flag.store(true, Ordering::Relaxed);
let nodes: Vec<Arc<Mutex<Option<AsyncNode>>>>;
{
let mut inner = self.inner.lock().await;
inner.phase = PoolPhase::Shutdown;
async_pool_info!("entering shutdown phase");
for slot in &mut inner.slots {
slot.state = ConnState::Unhealthy {
since: Instant::now(),
};
}
nodes = inner.nodes.iter().map(Arc::clone).collect();
}
for node_arc in &nodes {
let mut node_guard = node_arc.lock().await;
if let Some(ref node) = *node_guard {
let _ = node.shutdown().await;
}
*node_guard = None;
}
{
let inner = self.inner.lock().await;
inner.update_metrics();
}
self.notify.notify_waiters();
async_pool_info!("pool shutdown complete");
}
pub fn is_shutdown(&self) -> bool {
self.shutdown_flag.load(Ordering::Relaxed)
}
pub async fn stats(&self) -> AsyncPoolStats {
let inner = self.inner.lock().await;
AsyncPoolStats {
size: self.config.size,
healthy: inner.healthy_count(),
total_requests: self.counter.load(Ordering::Relaxed),
is_shutdown: self.shutdown_flag.load(Ordering::Relaxed),
}
}
}
#[derive(Debug, Clone)]
pub struct AsyncPoolStats {
pub size: usize,
pub healthy: usize,
pub total_requests: usize,
pub is_shutdown: bool,
}
pub struct AsyncPooledConn<'a> {
pool: &'a AsyncPool,
idx: usize,
node: Arc<Mutex<Option<AsyncNode>>>,
#[allow(dead_code)]
config: AsyncPoolConfig,
#[allow(dead_code)]
host_idx: usize,
}
impl fmt::Debug for AsyncPooledConn<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("AsyncPooledConn")
.field("idx", &self.idx)
.field("host_idx", &self.host_idx)
.finish()
}
}
impl AsyncPooledConn<'_> {
pub async fn query(&self, sql: &str) -> Result<VoltTable, VoltError> {
async_pool_trace!(slot = self.idx, sql = sql, "executing query");
let mut node_guard = self.node.lock().await;
let node = node_guard
.as_mut()
.ok_or(VoltError::ConnectionNotAvailable)?;
let mut rx = node.query(sql).await?;
drop(node_guard);
let result =
async_block_for_result_with_timeout(&mut rx, self.config.request_timeout).await;
self.handle_result(&result).await;
result
}
pub async fn list_procedures(&mut self) -> Result<VoltTable, VoltError> {
async_pool_trace!(slot = self.idx, "listing procedures");
let mut node_guard = self.node.lock().await;
let node = node_guard
.as_mut()
.ok_or(VoltError::ConnectionNotAvailable)?;
let mut rx = node.list_procedures().await?;
drop(node_guard);
let result =
async_block_for_result_with_timeout(&mut rx, self.config.request_timeout).await;
self.handle_result(&result).await;
result
}
pub async fn call_sp(
&mut self,
proc: &str,
params: Vec<&dyn Value>,
) -> Result<VoltTable, VoltError> {
async_pool_trace!(
slot = self.idx,
procedure = proc,
"calling stored procedure"
);
let mut node_guard = self.node.lock().await;
let node = node_guard
.as_mut()
.ok_or(VoltError::ConnectionNotAvailable)?;
let mut rx = node.call_sp(proc, params).await?;
drop(node_guard);
let result =
async_block_for_result_with_timeout(&mut rx, self.config.request_timeout).await;
self.handle_result(&result).await;
result
}
pub async fn upload_jar(&self, bs: Vec<u8>) -> Result<VoltTable, VoltError> {
async_pool_trace!(slot = self.idx, size = bs.len(), "uploading jar");
let mut node_guard = self.node.lock().await;
let node = node_guard
.as_mut()
.ok_or(VoltError::ConnectionNotAvailable)?;
let mut rx = node.upload_jar(bs).await?;
drop(node_guard);
let result =
async_block_for_result_with_timeout(&mut rx, self.config.request_timeout).await;
self.handle_result(&result).await;
result
}
async fn handle_result<T>(&self, result: &Result<T, VoltError>) {
match result {
Ok(_) => {
self.pool.mark_success(self.idx).await;
}
Err(e) if e.is_connection_fatal() => {
async_pool_error!(slot = self.idx, error = ?e, "fatal connection error detected");
{
let mut guard = self.node.lock().await;
*guard = None;
}
self.pool.report_fatal_error(self.idx).await;
}
Err(_) => {
}
}
}
pub fn slot_index(&self) -> usize {
self.idx
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_conn_state_is_healthy() {
assert!(ConnState::Healthy.is_healthy());
assert!(
!ConnState::Unhealthy {
since: Instant::now()
}
.is_healthy()
);
assert!(!ConnState::Reconnecting.is_healthy());
}
#[test]
fn test_circuit_should_allow_closed() {
let circuit = Circuit::Closed;
assert!(circuit.should_allow());
}
#[test]
fn test_circuit_should_allow_open_not_expired() {
let circuit = Circuit::Open {
until: Instant::now() + Duration::from_secs(60),
};
assert!(!circuit.should_allow());
}
#[test]
fn test_circuit_should_allow_open_expired() {
let circuit = Circuit::Open {
until: Instant::now() - Duration::from_secs(1),
};
assert!(circuit.should_allow());
}
#[test]
fn test_circuit_should_allow_half_open() {
let circuit = Circuit::HalfOpen;
assert!(circuit.should_allow());
}
#[test]
fn test_circuit_transitions() {
let mut circuit = Circuit::Closed;
circuit.open(Duration::from_secs(30));
assert!(matches!(circuit, Circuit::Open { .. }));
circuit.half_open();
assert!(matches!(circuit, Circuit::HalfOpen));
circuit.close();
assert!(matches!(circuit, Circuit::Closed));
}
#[test]
fn test_pool_config_builder() {
let config = AsyncPoolConfig::new()
.size(20)
.reconnect_backoff(Duration::from_secs(10))
.circuit_open_duration(Duration::from_secs(60))
.exhaustion_policy(ExhaustionPolicy::Block {
timeout: Duration::from_secs(5),
})
.validation_mode(ValidationMode::BestEffort)
.circuit_failure_threshold(5)
.shutdown_timeout(Duration::from_secs(60))
.request_timeout(Duration::from_secs(15));
assert_eq!(config.size, 20);
assert_eq!(config.reconnect_backoff, Duration::from_secs(10));
assert_eq!(config.circuit_open_duration, Duration::from_secs(60));
assert_eq!(
config.exhaustion_policy,
ExhaustionPolicy::Block {
timeout: Duration::from_secs(5)
}
);
assert_eq!(config.validation_mode, ValidationMode::BestEffort);
assert_eq!(config.circuit_failure_threshold, 5);
assert_eq!(config.shutdown_timeout, Duration::from_secs(60));
assert_eq!(config.request_timeout, Duration::from_secs(15));
}
#[test]
fn test_pool_config_default() {
let config = AsyncPoolConfig::default();
assert_eq!(config.size, 10);
assert_eq!(config.exhaustion_policy, ExhaustionPolicy::FailFast);
assert_eq!(config.validation_mode, ValidationMode::FailFast);
assert_eq!(config.request_timeout, Duration::from_secs(30));
}
#[test]
fn test_slot_meta_is_available() {
let slot = SlotMeta::new_healthy(0);
assert!(slot.is_available());
let slot = SlotMeta::new_unhealthy(0);
assert!(!slot.is_available());
let mut slot = SlotMeta::new_healthy(0);
slot.circuit = Circuit::Open {
until: Instant::now() + Duration::from_secs(60),
};
assert!(!slot.is_available());
}
#[test]
fn test_slot_meta_needs_reconnect() {
let mut slot = SlotMeta::new_unhealthy(0);
let backoff = Duration::from_millis(100);
assert!(slot.needs_reconnect(backoff));
slot.state = ConnState::Reconnecting;
assert!(!slot.needs_reconnect(backoff));
slot.state = ConnState::Healthy;
assert!(!slot.needs_reconnect(backoff));
}
#[test]
fn test_slot_meta_record_success() {
let mut slot = SlotMeta::new_unhealthy(0);
slot.consecutive_failures = 5;
slot.record_success();
assert_eq!(slot.consecutive_failures, 0);
assert!(matches!(slot.state, ConnState::Healthy));
assert!(matches!(slot.circuit, Circuit::Closed));
}
#[test]
fn test_slot_meta_record_failure_opens_circuit() {
let mut slot = SlotMeta::new_healthy(0);
slot.consecutive_failures = 2;
let config = AsyncPoolConfig::default().circuit_failure_threshold(3);
slot.record_failure(&config);
assert_eq!(slot.consecutive_failures, 3);
assert!(matches!(slot.circuit, Circuit::Open { .. }));
}
#[test]
fn test_pool_phase() {
assert_eq!(PoolPhase::Running, PoolPhase::Running);
assert_ne!(PoolPhase::Running, PoolPhase::Shutdown);
}
}