use std::pin::Pin;
use std::{future::poll_fn, task::Context, task::Poll};
use super::cmd::{commands::PubSubCommand, commands::SubscribeOutputCommand, Command};
use super::codec::Codec;
use super::errors::{CommandError, Error};
use ntex::{io::IoBoxed, io::RecvError, util::ready, util::Stream};
pub struct SimpleClient {
io: IoBoxed,
}
impl SimpleClient {
pub(crate) fn new(io: IoBoxed) -> Self {
SimpleClient { io }
}
pub async fn exec<U>(&self, cmd: U) -> Result<U::Output, CommandError>
where
U: Command,
{
self.send(cmd)?;
loop {
if let Some(result) = self.recv::<U>().await {
return result;
}
}
}
pub fn send<U>(&self, cmd: U) -> Result<(), CommandError>
where
U: Command,
{
self.io.encode(cmd.to_request(), &Codec)?;
Ok(())
}
pub fn subscribe(
self,
cmd: SubscribeOutputCommand,
) -> Result<SubscriptionClient<SubscribeOutputCommand>, CommandError> {
self.send(cmd)?;
Ok(SubscriptionClient {
client: self,
_cmd: std::marker::PhantomData,
})
}
pub(crate) fn into_inner(self) -> IoBoxed {
self.io
}
async fn recv<U: Command>(&self) -> Option<Result<U::Output, CommandError>> {
poll_fn(|cx| self.poll_recv::<U>(cx)).await
}
fn poll_recv<U: Command>(
&self,
cx: &mut Context<'_>,
) -> Poll<Option<Result<U::Output, CommandError>>> {
match ready!(self.io.poll_recv(&Codec, cx)) {
Ok(item) => match item.into_result() {
Ok(result) => Poll::Ready(Some(U::to_output(result))),
Err(err) => Poll::Ready(Some(Err(CommandError::Error(err)))),
},
Err(RecvError::KeepAlive) | Err(RecvError::Stop) => {
unreachable!()
}
Err(RecvError::WriteBackpressure) => {
if let Err(err) = ready!(self.io.poll_flush(cx, false))
.map_err(|e| CommandError::Protocol(Error::PeerGone(Some(e))))
{
Poll::Ready(Some(Err(err)))
} else {
Poll::Pending
}
}
Err(RecvError::Decoder(err)) => Poll::Ready(Some(Err(CommandError::Protocol(err)))),
Err(RecvError::PeerGone(err)) => {
Poll::Ready(Some(Err(CommandError::Protocol(Error::PeerGone(err)))))
}
}
}
}
pub struct SubscriptionClient<U: Command + PubSubCommand> {
client: SimpleClient,
_cmd: std::marker::PhantomData<U>,
}
impl<U: Command + PubSubCommand> Stream for SubscriptionClient<U> {
type Item = Result<U::Output, CommandError>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.poll_recv(cx)
}
}
impl<U: Command + PubSubCommand> SubscriptionClient<U> {
pub fn into_client(self) -> SimpleClient {
self.client
}
pub fn send<T: Command + PubSubCommand>(&self, cmd: T) -> Result<(), CommandError> {
self.client.send(cmd)
}
pub async fn recv(&self) -> Option<Result<U::Output, CommandError>> {
poll_fn(|cx| self.client.poll_recv::<U>(cx)).await
}
pub fn poll_recv(
&self,
cx: &mut Context<'_>,
) -> Poll<Option<Result<U::Output, CommandError>>> {
self.client.poll_recv::<U>(cx)
}
}