use std::future::Future;
use tokio::task::JoinHandle;
use crate::stream::{AnyStream, StartStream};
use crate::requestor::{AnyRequestor, OwnedSender};
use crate::StreamResponse;
pub struct ApiPool<T>(deadqueue::unlimited::Queue<T>);
impl<Api: Send + 'static> ApiPool<Api> {
pub fn new(api: Api) -> Self {
let q = deadqueue::unlimited::Queue::new();
q.push(api);
Self(q)
}
pub fn add(&self, api: Api) {
self.0.push(api);
}
pub async fn with_api<T, Fut: Future<Output=(impl Into<Api>, T)>, Fun: FnOnce(Api) -> Fut>(&self, fun: Fun) -> T where T: Send+Sized, Fut: Send{
let api = self.0.pop().await;
let (api, res) = fun(api).await;
self.0.push(api.into());
res
}
}
impl<T:Clone> ApiPool<T> {
pub async fn get(&self) -> T {
let res = self.0.pop().await;
self.0.push(res.clone());
res
}
}
impl<Api, Req, Res> OwnedSender<Req, Res> for ApiPool<Api> where Api: Send + 'static + OwnedSender<Req, Res>, Req: Send, Res: Send {
fn send_and_back(self, req: Req) -> impl Future<Output = (Self,Result<Res, tonic::Status>)> {
log::warn!("Don use ApiPool::send_and_back! Please, use ApiPool::send");
Box::pin(async move{
let res = self.send(req).await;
(self, res)
})
}
fn send(&self, req: Req) -> impl Future<Output = Result<Res, tonic::Status>> + Send{
self.with_api(move |api|Box::pin(async move {
api.send_and_back(req).await
}))
}
}
impl<Api, Req,T> StartStream<Req,T> for ApiPool<Api> where Api: StartStream<Req, T> + Clone + Send, Req: Send {
fn start_stream<S>(&self, req: Req, sender: S) -> impl Future<Output=Result<JoinHandle<()>, tonic::Status>> + Send
where S: futures::Sink<T> + Unpin + Send + 'static {
Box::pin(async move {
let api = self.0.pop().await;
self.0.push(api.clone());
api.start_stream(req, sender).await
})
}
}
impl<T: AnyRequestor + 'static> AnyRequestor for ApiPool<T> {}
impl<T: AnyStream<StreamResponse> + Clone> AnyStream<StreamResponse> for ApiPool<T> {}