#![deny(missing_docs)]
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::Instant;
use crossbeam_queue::{ArrayQueue, PushError};
#[derive(Debug)]
pub struct Live<T: Send> {
pub conn: T,
pub live_since: Instant,
}
impl<T: Send> Live<T> {
pub fn new(conn: T) -> Live<T> {
Live {
conn,
live_since: Instant::now(),
}
}
}
#[derive(Debug)]
pub(crate) struct Idle<T: Send> {
conn: Live<T>,
idle_since: Instant,
}
impl<T: Send> Idle<T> {
fn new(conn: Live<T>) -> Idle<T> {
Idle {
conn,
idle_since: Instant::now(),
}
}
}
#[derive(Debug)]
pub struct Queue<C: Send> {
idle: ArrayQueue<Idle<C>>,
total_count: AtomicUsize,
idle_push_error_count: AtomicUsize,
}
impl<C: Send> Queue<C> {
pub(crate) fn new(capacity: usize) -> Queue<C> {
Queue {
idle: ArrayQueue::new(capacity),
total_count: AtomicUsize::new(0),
idle_push_error_count: AtomicUsize::new(0),
}
}
#[inline(always)]
pub(crate) fn idle(&self) -> usize {
self.idle.len()
}
#[inline(always)]
pub(crate) fn total(&self) -> usize {
self.total_count.load(Ordering::SeqCst)
}
pub(crate) fn idle_push_error_count(&self) -> usize {
self.idle_push_error_count.load(Ordering::Relaxed)
}
pub(crate) fn new_conn(&self, conn: Live<C>) {
if self.store(conn).is_ok() {
self.increment();
}
}
pub(crate) fn store(&self, conn: Live<C>) -> Result<(), PushError<Idle<C>>> {
if let Err(err) = self.idle.push(Idle::new(conn)) {
self.idle_push_error_count.fetch_add(1, Ordering::Relaxed);
return Err(err);
}
Ok(())
}
pub(crate) fn get(&self) -> Option<Live<C>> {
self.idle.pop().ok().map(|Idle { conn, .. }| conn)
}
#[inline(always)]
pub(crate) fn increment(&self) {
self.total_count.fetch_add(1, Ordering::SeqCst);
}
#[inline(always)]
pub(crate) fn decrement(&self) {
self.total_count.fetch_sub(1, Ordering::SeqCst);
}
pub(crate) fn safe_increment(&self, max: usize) -> Option<()> {
let mut curr_count = self.total();
while curr_count < max {
match self.total_count.compare_exchange(
curr_count,
curr_count + 1,
Ordering::SeqCst,
Ordering::SeqCst,
) {
Ok(_) => {
return Some(());
}
Err(_) => {
curr_count = self.total();
}
}
}
None
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn new_conn() {
let conns = Queue::new(1);
assert_eq!(conns.idle(), 0);
assert_eq!(conns.total(), 0);
conns.new_conn(Live::new(()));
assert_eq!(conns.idle(), 1);
assert_eq!(conns.total(), 1);
}
#[test]
fn store() -> Result<(), crossbeam_queue::PushError<Idle<()>>> {
let conns = Queue::new(1);
assert_eq!(conns.idle(), 0);
assert_eq!(conns.total(), 0);
conns.store(Live::new(()))?;
assert_eq!(conns.idle(), 1);
assert_eq!(conns.total(), 0);
Ok(())
}
#[test]
fn get() {
let conns = Queue::new(1);
assert!(conns.get().is_none());
conns.new_conn(Live::new(()));
assert!(conns.get().is_some());
assert_eq!(conns.idle(), 0);
assert_eq!(conns.total(), 1);
}
#[test]
fn increment_and_decrement() {
let conns: Queue<()> = Queue::new(1);
assert_eq!(conns.total(), 0);
assert_eq!(conns.idle(), 0);
conns.increment();
assert_eq!(conns.total(), 1);
assert_eq!(conns.idle(), 0);
conns.decrement();
assert_eq!(conns.total(), 0);
assert_eq!(conns.idle(), 0);
}
#[test]
fn store_queue_more_than_the_capacity() {
let conns: Queue<()> = Queue::new(1);
conns.new_conn(Live::new(()));
assert_eq!(1, conns.total());
assert_eq!(1, conns.idle());
assert_eq!(0, conns.idle_push_error_count());
conns.new_conn(Live::new(()));
assert_eq!(1, conns.total());
assert_eq!(1, conns.idle());
assert_eq!(1, conns.idle_push_error_count());
}
}