use crate::WorkerState;
use crate::BoxFuture;
use crate::SessionResult;
use std::fmt;
use std::collections::VecDeque;
use tokio::sync::mpsc::error::{TrySendError, SendTimeoutError};
use tokio::sync::{mpsc, oneshot};
use crate::TIMEOUT;
#[derive(Debug)]
pub struct Resource<T> {
resource: T,
manager_sender: mpsc::Sender<Request<T>>,
recycled: bool
}
impl<T> Drop for Resource<T> {
fn drop(&mut self) {
if !self.recycled {
loop {
match self.manager_sender.try_send(Request::Recycle) {
Ok(_) => break,
Err(TrySendError::Closed(_)) => break,
Err(TrySendError::Full(_)) => (),
}
}
return;
}
}
}
impl<T> Resource<T> {
fn new(resource: T, manager_sender: mpsc::Sender<Request<T>>) -> Self {
Resource {
resource,
manager_sender,
recycled: false
}
}
pub fn get(&mut self) -> &mut T {
&mut self.resource
}
pub async fn recycle(mut self) {
if !self.recycled {
let res = self.manager_sender.send_timeout(Request::Recycle, TIMEOUT).await;
if let Ok(_) = res {
self.recycled = true;
}
}
}
}
pub enum Request<T> {
Checkin(Resource<T>),
BlockCheckout(oneshot::Sender<Option<Resource<T>>>),
NonBlockCheckout(oneshot::Sender<Option<Resource<T>>>),
Recycle,
PrintStatistics,
ShutdownSafe,
}
pub struct Pool<T, F>
where
F: Fn() -> BoxFuture<T> + Send + 'static,
T: Send + 'static
{
recv: mpsc::Receiver<Request<T>>,
self_sender: mpsc::Sender<Request<T>>,
factory: F,
resources: VecDeque<Resource<T>>,
waiting: VecDeque<oneshot::Sender<Option<Resource<T>>>>,
size: usize,
max_overflow: usize,
overflow_worker: usize,
shutdown: bool,
}
impl<T, F> fmt::Debug for Pool<T, F>
where
F: Fn() -> BoxFuture<T> + Send + 'static,
T: Send + 'static
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Pool")
.field("size", &self.size)
.field("max_overflow", &self.max_overflow)
.field("overflow_worker", &self.overflow_worker)
.field("resources", &self.resources.len()).finish()
}
}
impl<T, F> Pool<T, F>
where
F: Fn() -> BoxFuture<T> + Send + 'static,
T: fmt::Debug + Send + 'static
{
pub async fn new (pool_size: usize,
max_overflow: usize,
factory: F) -> Self {
let (self_sender, recv) = mpsc::channel(50);
let mut pool = Pool {
recv,
self_sender,
factory,
resources: VecDeque::new(),
waiting: VecDeque::new(),
size: pool_size,
max_overflow,
overflow_worker: 0,
shutdown: false
};
for _ in 0..pool_size {
pool.factory_resource().await;
}
pool
}
pub fn run_service(mut self) -> Session<T> {
let session = Session::new(self.self_sender.clone());
tokio::spawn(async move {
loop {
let res = self.recv.recv().await;
if let WorkerState::Disconnected = self.handle_recv(res).await {
return ();
}
}
});
return session
}
#[inline]
async fn handle_recv(&mut self, res: Option<Request<T>>) -> WorkerState {
match res {
Some(req) => {
match req {
Request::Checkin(resource) => {
let mut resource = Some(resource);
while let Some(resp) = self.waiting.pop_front() {
match resp.send(resource) {
Ok(_) => {
return WorkerState::Continue
}
Err(res) => {
resource = res;
}
}
}
if self.shutdown {
return WorkerState::Disconnected
}
self.checkin(resource.unwrap());
return WorkerState::Continue
}
Request::BlockCheckout(resp) => {
if self.shutdown {
let _ = resp.send(None);
return WorkerState::Continue
}
match self.checkout().await {
Some(resource) => {
let resource = Some(resource);
match resp.send(resource) {
Ok(_) => (),
Err(mut resource) => {
while let Some(resp) = self.waiting.pop_front() {
match resp.send(resource) {
Ok(_) => {
return WorkerState::Continue
}
Err(res) => {
resource = res;
}
}
}
self.checkin(resource.unwrap());
}
}
}
None => {
self.waiting.push_back(resp);
}
}
return WorkerState::Continue
}
Request::NonBlockCheckout(resp) => {
if self.shutdown {
let _ = resp.send(None);
return WorkerState::Continue;
}
match self.checkout().await {
Some(resource) => {
let resource = Some(resource);
if let Err(mut resource) = resp.send(resource) {
while let Some(resp) = self.waiting.pop_front() {
match resp.send(resource) {
Ok(_) => {
return WorkerState::Continue
}
Err(res) => {
resource = res;
}
}
}
self.checkin(resource.unwrap());
}
}
None => {
let _ = resp.send(None);
}
}
return WorkerState::Continue
}
Request::ShutdownSafe => {
if !self.shutdown {
self.shutdown = true;
}
return WorkerState::Continue
}
Request::Recycle => {
if self.overflow_worker > 0 {
self.overflow_worker -= 1;
} else {
self.factory_resource().await;
}
let resource = self.checkout().await.unwrap();
let mut resource = Some(resource);
while let Some(resp) = self.waiting.pop_front() {
match resp.send(resource) {
Ok(_) => {
return WorkerState::Continue
}
Err(res) => {
resource = res;
}
}
}
if self.shutdown {
return WorkerState::Disconnected
}
self.checkin(resource.unwrap());
return WorkerState::Continue
}
Request::PrintStatistics => {
println!("==> {:?}", &self);
return WorkerState::Continue
}
}
}
None => WorkerState::Disconnected
}
}
#[inline]
fn checkin(&mut self, mut resource: Resource<T>) {
if resource.recycled {
return;
}
if self.overflow_worker > 0 {
resource.recycled = true;
self.overflow_worker -= 1;
return;
}
self.resources.push_back(resource);
}
#[inline]
async fn checkout(&mut self) -> Option<Resource<T>> {
if self.resources.len() == 0 {
return self.factory_overflow().await
}
return self.resources.pop_front()
}
#[inline]
async fn factory_resource(&mut self) {
let resource = (self.factory)();
self.resources.push_back(Resource::new(resource.await, self.self_sender.clone()));
}
#[inline]
async fn factory_overflow(&mut self) -> Option<Resource<T>> {
if self.overflow_worker < self.max_overflow {
let resource = (self.factory)();
self.overflow_worker += 1;
let res = Resource::new(resource.await, self.self_sender.clone());
return Some(res)
}
return None
}
}
pub struct Session<T> {
sender: mpsc::Sender<Request<T>>
}
impl<T> Session<T>
where
T: Send + 'static
{
fn new(sender: mpsc::Sender<Request<T>>) -> Self {
Session {
sender
}
}
pub fn clone(&self) -> Self {
let sender = self.sender.clone();
Session {
sender
}
}
pub async fn checkin(&self, resource: Resource<T>) -> Result<(), SessionResult> {
let res = self.sender.send_timeout(Request::Checkin(resource), TIMEOUT).await;
match res {
Ok(_) => Ok(()),
Err(e) => {
match e {
SendTimeoutError::Timeout(_) => Err(SessionResult::Timeout),
SendTimeoutError::Closed(_) => Err(SessionResult::Closed),
}
}
}
}
pub async fn block_checkout(&self) -> Result<Resource<T>, SessionResult> {
let (ask, resp) = oneshot::channel();
let req = Request::BlockCheckout(ask);
let res = self.sender.send_timeout(req, TIMEOUT).await;
match res {
Err(SendTimeoutError::Closed(_req)) => {
return Err(SessionResult::Closed)
}
Err(SendTimeoutError::Timeout(_req)) => {
return Err(SessionResult::Timeout)
}
Ok(_) => {
match resp.await {
Ok(oresource) => {
let resource = unsafe {
oresource.unwrap_unchecked()
};
Ok(resource)
}
Err(_) => {
return Err(SessionResult::NoResponse)
}
}
}
}
}
pub async fn nonblock_checkout(&self) -> Result<Resource<T>, SessionResult> {
let (ask, resp) = oneshot::channel();
let req = Request::NonBlockCheckout(ask);
let res = self.sender.send_timeout(req, TIMEOUT).await;
match res {
Err(SendTimeoutError::Closed(_req)) => {
return Err(SessionResult::Closed)
}
Err(SendTimeoutError::Timeout(_req)) => {
return Err(SessionResult::Timeout)
}
Ok(_) => {
match resp.await {
Ok(oresource) => {
match oresource {
Some(r) => Ok(r),
None => Err(SessionResult::Full),
}
}
Err(_) => {
return Err(SessionResult::NoResponse)
}
}
}
}
}
pub async fn destroyed(&mut self, resource: Resource<T>) {
resource.recycle().await;
}
pub async fn print_statistics(&self) {
let _ = self.sender.send_timeout(Request::PrintStatistics, TIMEOUT).await;
}
pub async fn safe_shutdown(&self) {
let _ = self.sender.send(Request::ShutdownSafe).await;
}
}