use std::sync::Arc;
use tokio::sync::OwnedSemaphorePermit;
use super::{client_pool::ClientPoolInner, slot::PoolSlot};
use crate::{
app::Packet,
client::ClientError,
message::{DecodeWith, EncodeWith},
serializer::Serializer,
};
pub struct PooledClientLease<S, P, C>
where
S: Serializer + Clone + Send + Sync + 'static,
P: bincode::Encode + Clone + Send + Sync + 'static,
C: Send + 'static,
{
slot: Arc<PoolSlot<S, P, C>>,
_permit: OwnedSemaphorePermit,
release_inner: Option<Arc<ClientPoolInner<S, P, C>>>,
}
macro_rules! dispatch_on_connection {
($self:ident, | $conn:ident | $op:expr) => {{
let mut $conn = $self.slot.checkout().await?;
let result = $op.await;
if let Err(err) = &result
&& Self::should_recycle(err)
{
$conn.mark_broken();
}
result
}};
}
impl<S, P, C> PooledClientLease<S, P, C>
where
S: Serializer + Clone + Send + Sync + 'static,
P: bincode::Encode + Clone + Send + Sync + 'static,
C: Send + 'static,
{
pub(crate) fn new(
slot: Arc<PoolSlot<S, P, C>>,
permit: OwnedSemaphorePermit,
release_inner: Option<Arc<ClientPoolInner<S, P, C>>>,
) -> Self {
Self {
slot,
_permit: permit,
release_inner,
}
}
fn should_recycle(err: &ClientError) -> bool { err.should_recycle_connection() }
pub async fn send<M: EncodeWith<S>>(&self, message: &M) -> Result<(), ClientError> {
dispatch_on_connection!(self, |conn| conn.send(message))
}
pub async fn receive<M: DecodeWith<S>>(&self) -> Result<M, ClientError> {
dispatch_on_connection!(self, |conn| conn.receive())
}
pub async fn call<Req, Resp>(&self, request: &Req) -> Result<Resp, ClientError>
where
Req: EncodeWith<S>,
Resp: DecodeWith<S>,
{
dispatch_on_connection!(self, |conn| conn.call(request))
}
pub async fn send_envelope<M>(&self, envelope: M) -> Result<u64, ClientError>
where
M: Packet + EncodeWith<S>,
{
dispatch_on_connection!(self, |conn| conn.send_envelope(envelope))
}
pub async fn receive_envelope<M>(&self) -> Result<M, ClientError>
where
M: Packet + DecodeWith<S>,
{
dispatch_on_connection!(self, |conn| conn.receive_envelope())
}
pub async fn call_correlated<M>(&self, request: M) -> Result<M, ClientError>
where
M: Packet + EncodeWith<S> + DecodeWith<S>,
{
dispatch_on_connection!(self, |conn| conn.call_correlated(request))
}
}
impl<S, P, C> Drop for PooledClientLease<S, P, C>
where
S: Serializer + Clone + Send + Sync + 'static,
P: bincode::Encode + Clone + Send + Sync + 'static,
C: Send + 'static,
{
fn drop(&mut self) {
if let Some(inner) = &self.release_inner {
inner.scheduler.notify_capacity_available(Arc::clone(inner));
}
}
}