use std::{
ops::{Deref, DerefMut},
sync::Arc,
};
use log::*;
use tokio::sync::Mutex;
use crate::{
PeerConnection,
peer_manager::NodeId,
protocol::rpc::{
NamedProtocolService,
RpcClient,
RpcClientBuilder,
RpcError,
RpcHandshakeError,
error::HandshakeRejectReason,
},
};
const LOG_TARGET: &str = "comms::protocol::rpc::client_pool";
#[derive(Clone)]
pub struct RpcClientPool<T> {
pool: Arc<Mutex<LazyPool<T>>>,
}
impl<T> RpcClientPool<T>
where T: RpcPoolClient + From<RpcClient> + NamedProtocolService + Clone
{
pub(crate) fn new(peer_connection: PeerConnection, pool_size: usize, client_config: RpcClientBuilder<T>) -> Self {
let pool = LazyPool::new(peer_connection, pool_size, client_config);
Self {
pool: Arc::new(Mutex::new(pool)),
}
}
pub async fn get(&self) -> Result<RpcClientLease<T>, RpcClientPoolError> {
let mut pool = self.pool.lock().await;
pool.get_least_used_or_connect().await
}
pub async fn is_connected(&self) -> bool {
let pool = self.pool.lock().await;
pool.is_connected()
}
}
#[derive(Clone)]
pub(crate) struct LazyPool<T> {
connection: PeerConnection,
clients: Vec<RpcClientLease<T>>,
client_config: RpcClientBuilder<T>,
}
impl<T> LazyPool<T>
where T: RpcPoolClient + From<RpcClient> + NamedProtocolService + Clone
{
pub fn new(connection: PeerConnection, capacity: usize, client_config: RpcClientBuilder<T>) -> Self {
assert!(capacity > 0, "Pool capacity of 0 is invalid");
Self {
connection,
clients: Vec::with_capacity(capacity),
client_config,
}
}
pub async fn get_least_used_or_connect(&mut self) -> Result<RpcClientLease<T>, RpcClientPoolError> {
{
self.check_peer_connection()?;
let peer_node_id = self.connection.peer_node_id().clone();
let clients_capacity = self.clients.capacity();
let protocol_id = self.client_config.protocol_id.clone();
let client = match self.get_next_lease() {
Some(c) => {
trace!(
target: LOG_TARGET,
"Used existing RPC client session for connection '{}'",
self.connection.peer_node_id(),
);
c
},
None => match self.add_new_client_session().await {
Ok((c, len)) => {
trace!(
target: LOG_TARGET,
"Added new RPC client session for connection '{peer_node_id}' ({len} of {clients_capacity}, protocol: {protocol_id:?})"
);
c
},
Err(RpcClientPoolError::NoMoreRemoteServerRpcSessions(val)) => {
let c = self
.get_least_used()
.ok_or(RpcClientPoolError::NoMoreRemoteServerRpcSessions(val.clone()))?;
trace!(
target: LOG_TARGET,
"Used existing RPC client session for connection '{}', protocol: {:?} ({})",
peer_node_id, protocol_id, RpcClientPoolError::NoMoreRemoteServerRpcSessions(val),
);
c
},
Err(RpcClientPoolError::NoMoreRemoteClientRpcSessions(val)) => {
let c = self
.get_least_used()
.ok_or(RpcClientPoolError::NoMoreRemoteClientRpcSessions(val.clone()))?;
trace!(
target: LOG_TARGET,
"used existing RPC client session for connection '{}', protocol: {:?} ({})",
peer_node_id, protocol_id, RpcClientPoolError::NoMoreRemoteClientRpcSessions(val),
);
c
},
Err(err) => {
return Err(err);
},
},
};
if !client.is_connected() {
trace!(
target: LOG_TARGET,
"RPC client for connection '{peer_node_id}' is not connected, pruning"
);
self.prune();
return Err(RpcClientPoolError::CouldNotObtainRpcConnection);
}
Ok(client.clone())
}
}
pub fn is_connected(&self) -> bool {
self.connection.is_connected()
}
#[allow(dead_code)]
pub(super) fn refresh_num_active_connections(&mut self) -> usize {
self.prune();
self.clients.len()
}
fn check_peer_connection(&self) -> Result<(), RpcClientPoolError> {
if self.connection.is_connected() {
Ok(())
} else {
Err(RpcClientPoolError::PeerConnectionDropped {
peer: self.connection.peer_node_id().clone(),
})
}
}
fn get_next_lease(&self) -> Option<&RpcClientLease<T>> {
let client = self.get_least_used()?;
if self.is_full() {
return Some(client);
}
if client.lease_count() > 0 {
return None;
}
Some(client)
}
fn get_least_used(&self) -> Option<&RpcClientLease<T>> {
let mut min_count = usize::MAX;
let mut selected_client = None;
for client in &self.clients {
let lease_count = client.lease_count();
if lease_count == 0 {
return Some(client);
}
if min_count > lease_count {
selected_client = Some(client);
min_count = lease_count;
}
}
selected_client
}
pub fn is_full(&self) -> bool {
self.clients.len() == self.clients.capacity()
}
async fn add_new_client_session(&mut self) -> Result<(&RpcClientLease<T>, usize), RpcClientPoolError> {
debug_assert!(!self.is_full(), "add_new_client called when pool is full");
let client = self
.connection
.connect_rpc_using_builder(self.client_config.clone())
.await?;
let client = RpcClientLease::new(client);
self.clients.push(client);
Ok((self.clients.last().unwrap(), self.clients.len()))
}
fn prune(&mut self) {
let initial_len = self.clients.len();
let cap = self.clients.capacity();
self.clients = self.clients.drain(..).fold(Vec::with_capacity(cap), |mut vec, c| {
if c.is_connected() {
vec.push(c);
}
vec
});
debug_assert_eq!(self.clients.capacity(), cap);
debug!(
target: LOG_TARGET,
"Pruned {} client(s) (total connections: {})",
initial_len - self.clients.len(),
self.clients.len()
)
}
}
#[derive(Debug, Clone)]
pub struct RpcClientLease<T> {
inner: T,
rc: Arc<()>,
}
impl<T> RpcClientLease<T> {
pub fn new(inner: T) -> Self {
Self {
inner,
rc: Arc::new(()),
}
}
pub(super) fn lease_count(&self) -> usize {
Arc::strong_count(&self.rc) - 1
}
}
impl<T> Deref for RpcClientLease<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl<T> DerefMut for RpcClientLease<T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
}
}
impl<T: RpcPoolClient> RpcPoolClient for RpcClientLease<T> {
fn is_connected(&self) -> bool {
self.inner.is_connected()
}
}
#[derive(Debug, thiserror::Error)]
pub enum RpcClientPoolError {
#[error("Peer connection to peer '{peer}' dropped")]
PeerConnectionDropped { peer: NodeId },
#[error("No peer RPC server sessions are available")]
NoMoreRemoteServerRpcSessions(String),
#[error("No peer RPC client sessions are available")]
NoMoreRemoteClientRpcSessions(String),
#[error("Failed to create client connection: {0}")]
FailedToConnect(RpcError),
#[error("Could not obtain RPC connection")]
CouldNotObtainRpcConnection,
}
impl From<RpcError> for RpcClientPoolError {
fn from(err: RpcError) -> Self {
match err {
RpcError::HandshakeError(RpcHandshakeError::Rejected(
HandshakeRejectReason::NoServerSessionsAvailable(val),
)) => RpcClientPoolError::NoMoreRemoteServerRpcSessions(val.to_string()),
RpcError::HandshakeError(RpcHandshakeError::Rejected(
HandshakeRejectReason::NoClientSessionsAvailable(val),
)) => RpcClientPoolError::NoMoreRemoteClientRpcSessions(val.to_string()),
err => RpcClientPoolError::FailedToConnect(err),
}
}
}
pub trait RpcPoolClient {
fn is_connected(&self) -> bool;
}