use std::fmt;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, Condvar, Mutex};
use std::time::{Duration, Instant};
use crate::pool_core::{Circuit, ConnState, ExhaustionPolicy, PoolPhase, ValidationMode};
use crate::{Node, NodeOpt, Opts, Value, VoltError, VoltTable, block_for_result, node};
#[cfg(feature = "tracing")]
macro_rules! pool_trace {
($($arg:tt)*) => { tracing::trace!($($arg)*) };
}
#[cfg(not(feature = "tracing"))]
macro_rules! pool_trace {
($($arg:tt)*) => {};
}
#[cfg(feature = "tracing")]
macro_rules! pool_debug {
($($arg:tt)*) => { tracing::debug!($($arg)*) };
}
#[cfg(not(feature = "tracing"))]
macro_rules! pool_debug {
($($arg:tt)*) => {};
}
#[cfg(feature = "tracing")]
macro_rules! pool_info {
($($arg:tt)*) => { tracing::info!($($arg)*) };
}
#[cfg(not(feature = "tracing"))]
macro_rules! pool_info {
($($arg:tt)*) => {};
}
#[cfg(feature = "tracing")]
macro_rules! pool_warn {
($($arg:tt)*) => { tracing::warn!($($arg)*) };
}
#[cfg(not(feature = "tracing"))]
macro_rules! pool_warn {
($($arg:tt)*) => {};
}
#[cfg(feature = "tracing")]
macro_rules! pool_error {
($($arg:tt)*) => { tracing::error!($($arg)*) };
}
#[cfg(not(feature = "tracing"))]
macro_rules! pool_error {
($($arg:tt)*) => {};
}
#[cfg(feature = "metrics")]
mod pool_metrics {
use metrics::{counter, gauge};
pub fn set_connections_total(count: usize) {
gauge!("voltdb_pool_connections_total").set(count as f64);
}
pub fn set_connections_healthy(count: usize) {
gauge!("voltdb_pool_connections_healthy").set(count as f64);
}
pub fn inc_reconnect_total() {
counter!("voltdb_pool_reconnect_total").increment(1);
}
pub fn inc_circuit_open_total() {
counter!("voltdb_pool_circuit_open_total").increment(1);
}
pub fn inc_requests_failed_total() {
counter!("voltdb_pool_requests_failed_total").increment(1);
}
pub fn inc_requests_total() {
counter!("voltdb_pool_requests_total").increment(1);
}
}
#[cfg(not(feature = "metrics"))]
mod pool_metrics {
pub fn set_connections_total(_count: usize) {}
pub fn set_connections_healthy(_count: usize) {}
pub fn inc_reconnect_total() {}
pub fn inc_circuit_open_total() {}
pub fn inc_requests_failed_total() {}
pub fn inc_requests_total() {}
}
#[derive(Debug, Clone)]
pub struct PoolConfig {
pub size: usize,
pub reconnect_backoff: Duration,
pub circuit_open_duration: Duration,
pub exhaustion_policy: ExhaustionPolicy,
pub validation_mode: ValidationMode,
pub circuit_failure_threshold: u32,
pub shutdown_timeout: Duration,
}
impl Default for PoolConfig {
fn default() -> Self {
Self {
size: 10,
reconnect_backoff: Duration::from_secs(5),
circuit_open_duration: Duration::from_secs(30),
exhaustion_policy: ExhaustionPolicy::FailFast,
validation_mode: ValidationMode::FailFast,
circuit_failure_threshold: 3,
shutdown_timeout: Duration::from_secs(30),
}
}
}
impl PoolConfig {
pub fn new() -> Self {
Self::default()
}
pub fn size(mut self, size: usize) -> Self {
self.size = size;
self
}
pub fn reconnect_backoff(mut self, duration: Duration) -> Self {
self.reconnect_backoff = duration;
self
}
pub fn circuit_open_duration(mut self, duration: Duration) -> Self {
self.circuit_open_duration = duration;
self
}
pub fn exhaustion_policy(mut self, policy: ExhaustionPolicy) -> Self {
self.exhaustion_policy = policy;
self
}
pub fn validation_mode(mut self, mode: ValidationMode) -> Self {
self.validation_mode = mode;
self
}
pub fn circuit_failure_threshold(mut self, threshold: u32) -> Self {
self.circuit_failure_threshold = threshold;
self
}
pub fn shutdown_timeout(mut self, timeout: Duration) -> Self {
self.shutdown_timeout = timeout;
self
}
}
#[derive(Debug)]
struct SlotMeta {
state: ConnState,
circuit: Circuit,
consecutive_failures: u32,
last_reconnect_attempt: Option<Instant>,
host_idx: usize,
}
impl SlotMeta {
fn new_healthy(host_idx: usize) -> Self {
Self {
state: ConnState::Healthy,
circuit: Circuit::Closed,
consecutive_failures: 0,
last_reconnect_attempt: None,
host_idx,
}
}
fn new_unhealthy(host_idx: usize) -> Self {
Self {
state: ConnState::Unhealthy {
since: Instant::now(),
},
circuit: Circuit::Open {
until: Instant::now() + Duration::from_secs(5),
},
consecutive_failures: 1,
last_reconnect_attempt: None,
host_idx,
}
}
fn is_available(&self) -> bool {
self.state.is_healthy() && self.circuit.should_allow()
}
fn needs_reconnect(&self, backoff: Duration) -> bool {
if self.state.is_healthy() || self.state.is_reconnecting() {
return false;
}
match self.last_reconnect_attempt {
None => true,
Some(last) => Instant::now().duration_since(last) >= backoff,
}
}
fn record_success(&mut self) {
self.consecutive_failures = 0;
self.state = ConnState::Healthy;
self.circuit.close();
}
fn record_failure(&mut self, config: &PoolConfig) {
self.consecutive_failures += 1;
self.state = ConnState::Unhealthy {
since: Instant::now(),
};
if self.consecutive_failures >= config.circuit_failure_threshold {
self.circuit.open(config.circuit_open_duration);
pool_metrics::inc_circuit_open_total();
}
}
}
struct InnerPool {
opts: Opts,
config: PoolConfig,
slots: Vec<SlotMeta>,
nodes: Vec<Arc<Mutex<Option<Node>>>>,
phase: PoolPhase,
}
impl fmt::Debug for InnerPool {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("InnerPool")
.field("config", &self.config)
.field("slots_count", &self.slots.len())
.field("phase", &self.phase)
.finish()
}
}
impl InnerPool {
fn node_opt(&self, host_idx: usize) -> Result<NodeOpt, VoltError> {
let ip_port = self
.opts
.0
.ip_ports
.get(host_idx)
.cloned()
.ok_or(VoltError::InvalidConfig)?;
Ok(NodeOpt {
ip_port,
pass: self.opts.0.pass.clone(),
user: self.opts.0.user.clone(),
connect_timeout: self.opts.0.connect_timeout,
read_timeout: self.opts.0.read_timeout,
})
}
fn new(opts: Opts, config: PoolConfig) -> Result<Self, VoltError> {
let num_hosts = opts.0.ip_ports.len();
let mut inner = InnerPool {
opts,
config: config.clone(),
slots: Vec::with_capacity(config.size),
nodes: Vec::with_capacity(config.size),
phase: PoolPhase::Running,
};
for i in 0..config.size {
let host_idx = i % num_hosts;
let node_opt = inner.node_opt(host_idx)?;
pool_debug!(slot = i, host = host_idx, "creating connection");
match node::Node::new(node_opt) {
Ok(node) => {
inner.slots.push(SlotMeta::new_healthy(host_idx));
inner.nodes.push(Arc::new(Mutex::new(Some(node))));
pool_info!(slot = i, "connection established");
}
Err(e) => match config.validation_mode {
ValidationMode::FailFast => {
pool_error!(slot = i, error = ?e, "connection failed, aborting pool creation");
return Err(e);
}
ValidationMode::BestEffort => {
pool_warn!(slot = i, error = ?e, "connection failed, marking unhealthy");
inner.slots.push(SlotMeta::new_unhealthy(host_idx));
inner.nodes.push(Arc::new(Mutex::new(None)));
}
},
}
}
inner.update_metrics();
pool_info!(
size = config.size,
healthy = inner.healthy_count(),
"pool initialized"
);
Ok(inner)
}
fn healthy_count(&self) -> usize {
self.slots.iter().filter(|s| s.state.is_healthy()).count()
}
fn update_metrics(&self) {
pool_metrics::set_connections_total(self.slots.len());
pool_metrics::set_connections_healthy(self.healthy_count());
}
}
pub struct Pool {
inner: Arc<(Mutex<InnerPool>, Condvar)>,
counter: AtomicUsize,
shutdown_flag: AtomicBool,
config: PoolConfig,
}
impl fmt::Debug for Pool {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let inner = self.inner.0.lock().ok();
f.debug_struct("Pool")
.field("counter", &self.counter.load(Ordering::Relaxed))
.field("shutdown", &self.shutdown_flag.load(Ordering::Relaxed))
.field(
"healthy",
&inner.as_ref().map(|i| i.healthy_count()).unwrap_or(0),
)
.field("config", &self.config)
.finish()
}
}
impl Pool {
pub fn new<T: Into<Opts>>(opts: T) -> Result<Pool, VoltError> {
Pool::with_config(opts, PoolConfig::default())
}
pub fn new_manual<T: Into<Opts>>(size: usize, opts: T) -> Result<Pool, VoltError> {
Pool::with_config(opts, PoolConfig::new().size(size))
}
pub fn with_config<T: Into<Opts>>(opts: T, config: PoolConfig) -> Result<Pool, VoltError> {
let inner = InnerPool::new(opts.into(), config.clone())?;
Ok(Pool {
inner: Arc::new((Mutex::new(inner), Condvar::new())),
counter: AtomicUsize::new(0),
shutdown_flag: AtomicBool::new(false),
config,
})
}
pub fn get_conn(&self) -> Result<PooledConn<'_>, VoltError> {
if self.shutdown_flag.load(Ordering::Relaxed) {
pool_warn!("get_conn called on shutdown pool");
return Err(VoltError::PoolShutdown);
}
pool_metrics::inc_requests_total();
let preferred_idx = self.counter.fetch_add(1, Ordering::Relaxed) % self.config.size;
match self.config.exhaustion_policy {
ExhaustionPolicy::FailFast => self.get_conn_failfast(preferred_idx),
ExhaustionPolicy::Block { timeout } => self.get_conn_blocking(preferred_idx, timeout),
}
}
fn get_conn_failfast(&self, preferred_idx: usize) -> Result<PooledConn<'_>, VoltError> {
let (lock, _cvar) = &*self.inner;
let inner = lock
.lock()
.map_err(|_| VoltError::PoisonError("Pool lock poisoned".into()))?;
if inner.phase != PoolPhase::Running {
return Err(VoltError::PoolShutdown);
}
if inner.slots[preferred_idx].is_available() {
return self.checkout_slot(&inner, preferred_idx);
}
for i in 1..self.config.size {
let idx = (preferred_idx + i) % self.config.size;
if inner.slots[idx].is_available() {
pool_debug!(
preferred = preferred_idx,
actual = idx,
"using alternate connection"
);
return self.checkout_slot(&inner, idx);
}
}
pool_warn!("no healthy connections available");
pool_metrics::inc_requests_failed_total();
Err(VoltError::PoolExhausted)
}
fn get_conn_blocking(
&self,
preferred_idx: usize,
timeout: Duration,
) -> Result<PooledConn<'_>, VoltError> {
let deadline = Instant::now() + timeout;
let (lock, cvar) = &*self.inner;
let mut inner = lock
.lock()
.map_err(|_| VoltError::PoisonError("Pool lock poisoned".into()))?;
loop {
if inner.phase != PoolPhase::Running {
return Err(VoltError::PoolShutdown);
}
if inner.slots[preferred_idx].is_available() {
return self.checkout_slot(&inner, preferred_idx);
}
for i in 1..self.config.size {
let idx = (preferred_idx + i) % self.config.size;
if inner.slots[idx].is_available() {
return self.checkout_slot(&inner, idx);
}
}
let remaining = deadline.saturating_duration_since(Instant::now());
if remaining.is_zero() {
pool_warn!(timeout = ?timeout, "connection wait timed out");
pool_metrics::inc_requests_failed_total();
return Err(VoltError::Timeout);
}
pool_trace!("waiting for available connection");
let (guard, _timeout_result) = cvar
.wait_timeout(inner, remaining)
.map_err(|_| VoltError::PoisonError("Pool lock poisoned".into()))?;
inner = guard;
}
}
fn checkout_slot<'a>(
&'a self,
inner: &InnerPool,
idx: usize,
) -> Result<PooledConn<'a>, VoltError> {
let node = Arc::clone(&inner.nodes[idx]);
let config = inner.config.clone();
let host_idx = inner.slots[idx].host_idx;
pool_trace!(slot = idx, "connection acquired");
Ok(PooledConn {
pool: self,
idx,
node,
config,
host_idx,
})
}
fn report_fatal_error(&self, idx: usize) {
let (lock, cvar) = &*self.inner;
#[allow(clippy::type_complexity)]
let mut reconnect_info: Option<(Arc<Mutex<Option<Node>>>, NodeOpt, PoolConfig)> = None;
if let Ok(mut inner) = lock.lock() {
let config = inner.config.clone();
inner.slots[idx].record_failure(&config);
pool_debug!(slot = idx, "fatal error reported");
inner.update_metrics();
cvar.notify_all();
if !self.shutdown_flag.load(Ordering::Relaxed) {
let backoff = inner.config.reconnect_backoff;
if inner.slots[idx].needs_reconnect(backoff) {
let node_arc = Arc::clone(&inner.nodes[idx]);
let host_idx = inner.slots[idx].host_idx;
if let Ok(node_opt) = inner.node_opt(host_idx) {
inner.slots[idx].state = ConnState::Reconnecting;
inner.slots[idx].last_reconnect_attempt = Some(Instant::now());
reconnect_info = Some((node_arc, node_opt, config));
}
}
}
}
if let Some((node_arc, node_opt, config)) = reconnect_info {
self.do_reconnect(idx, node_arc, node_opt, config);
}
}
fn do_reconnect(
&self,
idx: usize,
node_arc: Arc<Mutex<Option<Node>>>,
node_opt: NodeOpt,
config: PoolConfig,
) {
pool_info!(slot = idx, "attempting reconnection");
pool_metrics::inc_reconnect_total();
match node::Node::new(node_opt) {
Ok(new_node) => {
if let Ok(mut node_guard) = node_arc.lock() {
*node_guard = Some(new_node);
}
let (lock, cvar) = &*self.inner;
if let Ok(mut inner) = lock.lock() {
inner.slots[idx].record_success();
inner.update_metrics();
cvar.notify_all(); }
pool_info!(slot = idx, "reconnection successful");
}
Err(_e) => {
let (lock, _) = &*self.inner;
if let Ok(mut inner) = lock.lock() {
inner.slots[idx].record_failure(&config);
inner.update_metrics();
}
pool_error!(slot = idx, error = ?_e, "reconnection failed");
}
}
}
fn mark_success(&self, idx: usize) {
let (lock, _) = &*self.inner;
if let Ok(mut inner) = lock.lock() {
inner.slots[idx].record_success();
inner.update_metrics();
}
}
pub fn shutdown(&self) {
pool_info!("initiating pool shutdown");
self.shutdown_flag.store(true, Ordering::Relaxed);
let (lock, cvar) = &*self.inner;
if let Ok(mut inner) = lock.lock() {
inner.phase = PoolPhase::Shutdown;
pool_info!("entering shutdown phase");
for slot in &mut inner.slots {
slot.state = ConnState::Unhealthy {
since: Instant::now(),
};
}
for node_arc in &inner.nodes {
if let Ok(mut node_guard) = node_arc.lock() {
if let Some(ref mut node) = *node_guard {
let _ = node.shutdown();
}
*node_guard = None;
}
}
inner.update_metrics();
cvar.notify_all();
}
pool_info!("pool shutdown complete");
}
pub fn is_shutdown(&self) -> bool {
self.shutdown_flag.load(Ordering::Relaxed)
}
pub fn stats(&self) -> PoolStats {
let (lock, _) = &*self.inner;
let inner = lock.lock().ok();
PoolStats {
size: self.config.size,
healthy: inner.as_ref().map(|i| i.healthy_count()).unwrap_or(0),
total_requests: self.counter.load(Ordering::Relaxed),
is_shutdown: self.shutdown_flag.load(Ordering::Relaxed),
}
}
}
#[derive(Debug, Clone)]
pub struct PoolStats {
pub size: usize,
pub healthy: usize,
pub total_requests: usize,
pub is_shutdown: bool,
}
pub struct PooledConn<'a> {
pool: &'a Pool,
idx: usize,
node: Arc<Mutex<Option<Node>>>,
#[allow(dead_code)] config: PoolConfig,
#[allow(dead_code)]
host_idx: usize,
}
impl fmt::Debug for PooledConn<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("PooledConn")
.field("idx", &self.idx)
.field("host_idx", &self.host_idx)
.finish()
}
}
impl PooledConn<'_> {
pub fn query(&mut self, sql: &str) -> Result<VoltTable, VoltError> {
pool_trace!(slot = self.idx, sql = sql, "executing query");
let mut node_guard = self
.node
.lock()
.map_err(|_| VoltError::PoisonError("Node lock poisoned".to_string()))?;
let node = node_guard
.as_mut()
.ok_or(VoltError::ConnectionNotAvailable)?;
let result = node.query(sql).and_then(|r| block_for_result(&r));
drop(node_guard);
self.handle_result(&result);
result
}
pub fn list_procedures(&mut self) -> Result<VoltTable, VoltError> {
pool_trace!(slot = self.idx, "listing procedures");
let mut node_guard = self
.node
.lock()
.map_err(|_| VoltError::PoisonError("Node lock poisoned".to_string()))?;
let node = node_guard
.as_mut()
.ok_or(VoltError::ConnectionNotAvailable)?;
let result = node.list_procedures().and_then(|r| block_for_result(&r));
drop(node_guard);
self.handle_result(&result);
result
}
pub fn call_sp(&mut self, proc: &str, params: Vec<&dyn Value>) -> Result<VoltTable, VoltError> {
pool_trace!(
slot = self.idx,
procedure = proc,
"calling stored procedure"
);
let mut node_guard = self
.node
.lock()
.map_err(|_| VoltError::PoisonError("Node lock poisoned".to_string()))?;
let node = node_guard
.as_mut()
.ok_or(VoltError::ConnectionNotAvailable)?;
let result = node
.call_sp(proc, params)
.and_then(|r| block_for_result(&r));
drop(node_guard);
self.handle_result(&result);
result
}
pub fn upload_jar(&mut self, bs: Vec<u8>) -> Result<VoltTable, VoltError> {
pool_trace!(slot = self.idx, size = bs.len(), "uploading jar");
let mut node_guard = self
.node
.lock()
.map_err(|_| VoltError::PoisonError("Node lock poisoned".to_string()))?;
let node = node_guard
.as_mut()
.ok_or(VoltError::ConnectionNotAvailable)?;
let result = node.upload_jar(bs).and_then(|r| block_for_result(&r));
drop(node_guard);
self.handle_result(&result);
result
}
fn handle_result<T>(&self, result: &Result<T, VoltError>) {
match result {
Ok(_) => {
self.pool.mark_success(self.idx);
}
Err(e) if e.is_connection_fatal() => {
pool_error!(slot = self.idx, error = ?e, "fatal connection error detected");
if let Ok(mut guard) = self.node.lock() {
*guard = None;
}
self.pool.report_fatal_error(self.idx);
}
Err(_) => {
}
}
}
pub fn slot_index(&self) -> usize {
self.idx
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_pool_config_builder() {
let config = PoolConfig::new()
.size(20)
.reconnect_backoff(Duration::from_secs(10))
.circuit_open_duration(Duration::from_secs(60))
.exhaustion_policy(ExhaustionPolicy::Block {
timeout: Duration::from_secs(5),
})
.validation_mode(ValidationMode::BestEffort)
.circuit_failure_threshold(5)
.shutdown_timeout(Duration::from_secs(60));
assert_eq!(config.size, 20);
assert_eq!(config.reconnect_backoff, Duration::from_secs(10));
assert_eq!(config.circuit_open_duration, Duration::from_secs(60));
assert_eq!(
config.exhaustion_policy,
ExhaustionPolicy::Block {
timeout: Duration::from_secs(5)
}
);
assert_eq!(config.validation_mode, ValidationMode::BestEffort);
assert_eq!(config.circuit_failure_threshold, 5);
assert_eq!(config.shutdown_timeout, Duration::from_secs(60));
}
#[test]
fn test_pool_config_default() {
let config = PoolConfig::default();
assert_eq!(config.size, 10);
assert_eq!(config.exhaustion_policy, ExhaustionPolicy::FailFast);
assert_eq!(config.validation_mode, ValidationMode::FailFast);
}
#[test]
fn test_slot_meta_is_available() {
let slot = SlotMeta::new_healthy(0);
assert!(slot.is_available());
let slot = SlotMeta::new_unhealthy(0);
assert!(!slot.is_available());
let mut slot = SlotMeta::new_healthy(0);
slot.circuit = Circuit::Open {
until: Instant::now() + Duration::from_secs(60),
};
assert!(!slot.is_available());
}
#[test]
fn test_slot_meta_needs_reconnect() {
let mut slot = SlotMeta::new_unhealthy(0);
let backoff = Duration::from_millis(100);
assert!(slot.needs_reconnect(backoff));
slot.state = ConnState::Reconnecting;
assert!(!slot.needs_reconnect(backoff));
slot.state = ConnState::Healthy;
assert!(!slot.needs_reconnect(backoff));
}
#[test]
fn test_slot_meta_record_success() {
let mut slot = SlotMeta::new_unhealthy(0);
slot.consecutive_failures = 5;
slot.record_success();
assert_eq!(slot.consecutive_failures, 0);
assert!(matches!(slot.state, ConnState::Healthy));
assert!(matches!(slot.circuit, Circuit::Closed));
}
#[test]
fn test_slot_meta_record_failure_opens_circuit() {
let mut slot = SlotMeta::new_healthy(0);
slot.consecutive_failures = 2;
let config = PoolConfig::default().circuit_failure_threshold(3);
slot.record_failure(&config);
assert_eq!(slot.consecutive_failures, 3);
assert!(matches!(slot.circuit, Circuit::Open { .. }));
}
}