use std::time::Duration;
use crate::io::{IOError, SharedStreamPool, SocketAddress, StreamPool};
#[derive(Debug)]
pub enum ClientError {
IOError(IOError),
BorshError(borsh::io::Error),
ResponseMismatch,
}
impl From<IOError> for ClientError {
fn from(err: IOError) -> Self {
Self::IOError(err)
}
}
impl From<borsh::io::Error> for ClientError {
fn from(err: borsh::io::Error) -> Self {
Self::BorshError(err)
}
}
#[derive(Clone, Debug)]
pub struct SocketClient {
pool: SharedStreamPool,
timeout: Duration,
}
impl SocketClient {
#[must_use]
pub fn new(pool: SharedStreamPool, timeout: Duration) -> Self {
Self { pool, timeout }
}
pub fn single(
addr: SocketAddress,
timeout: Duration,
) -> Result<Self, IOError> {
let pool = StreamPool::new(addr, 1)?.shared();
Ok(Self { pool, timeout })
}
pub async fn busy(&self) -> bool {
self.pool.read().await.busy().await
}
pub async fn call(&self, request: &[u8]) -> Result<Vec<u8>, ClientError> {
let pool = self.pool.read().await;
let timeout_result = tokio::time::timeout(self.timeout, async {
let mut stream = pool.get().await;
stream.call(request).await
})
.await;
let resp = match timeout_result {
Ok(result) => result?,
Err(_err) => return Err(IOError::RecvTimeout.into()),
};
Ok(resp)
}
pub fn set_timeout(&mut self, timeout: Duration) {
self.timeout = timeout;
}
pub async fn expand_to(
&mut self,
pool_size: u8,
) -> Result<(), ClientError> {
self.pool.write().await.expand_to(pool_size)?;
Ok(())
}
pub async fn try_connect(&self) -> Result<(), IOError> {
let pool = self.pool.read().await;
let mut stream = pool.get().await;
stream.connect().await
}
}