#![cfg(feature = "async")]
use std::collections::{hash_map, HashMap, VecDeque};
use std::sync::{Arc, Mutex};
use crate::connection::AsyncConnection;
use crate::request::{OwnedConnectionParams as ConnectionKey, ParsedRequest};
use crate::{Error, Request, Response};
#[derive(Clone)]
pub struct Client {
r#async: Arc<Mutex<ClientImpl<AsyncConnection>>>,
}
struct ClientImpl<T> {
connections: HashMap<ConnectionKey, Arc<T>>,
lru_order: VecDeque<ConnectionKey>,
capacity: usize,
}
impl Client {
pub fn new(capacity: usize) -> Self {
Client {
r#async: Arc::new(Mutex::new(ClientImpl {
connections: HashMap::new(),
lru_order: VecDeque::new(),
capacity,
})),
}
}
pub async fn send_async(&self, request: Request) -> Result<Response, Error> {
let parsed_request = ParsedRequest::new(request)?;
let key = parsed_request.connection_params();
let owned_key = key.into();
let conn_opt = {
let state = self.r#async.lock().unwrap();
if let Some(conn) = state.connections.get(&owned_key) {
Some(Arc::clone(conn))
} else {
None
}
};
let conn = if let Some(conn) = conn_opt {
conn
} else {
let connection = AsyncConnection::new(key, parsed_request.timeout_at).await?;
let connection = Arc::new(connection);
let mut state = self.r#async.lock().unwrap();
if let hash_map::Entry::Vacant(entry) = state.connections.entry(owned_key) {
entry.insert(Arc::clone(&connection));
state.lru_order.push_back(key.into());
if state.connections.len() > state.capacity {
if let Some(oldest_key) = state.lru_order.pop_front() {
state.connections.remove(&oldest_key);
}
}
}
connection
};
conn.send(parsed_request).await
}
}
pub trait RequestExt {
fn send_async_with_client(
self,
client: &Client,
) -> impl std::future::Future<Output = Result<Response, Error>>;
}
impl RequestExt for Request {
fn send_async_with_client(
self,
client: &Client,
) -> impl std::future::Future<Output = Result<Response, Error>> {
client.send_async(self)
}
}