1use std::pin::Pin;
2use std::{future::poll_fn, task::Context, task::Poll};
3
4use super::cmd::{commands::PubSubCommand, commands::SubscribeOutputCommand, Command};
5use super::codec::Codec;
6use super::errors::{CommandError, Error};
7use ntex::{io::IoBoxed, io::RecvError, util::ready, util::Stream};
8
9pub struct SimpleClient {
11 io: IoBoxed,
12}
13
14impl SimpleClient {
15 pub(crate) fn new(io: IoBoxed) -> Self {
17 SimpleClient { io }
18 }
19
20 pub async fn exec<U>(&self, cmd: U) -> Result<U::Output, CommandError>
22 where
23 U: Command,
24 {
25 self.send(cmd)?;
26 loop {
27 if let Some(result) = self.recv::<U>().await {
28 return result;
29 }
30 }
31 }
32
33 pub fn send<U>(&self, cmd: U) -> Result<(), CommandError>
35 where
36 U: Command,
37 {
38 self.io.encode(cmd.to_request(), &Codec)?;
39 Ok(())
40 }
41
42 pub fn subscribe(
44 self,
45 cmd: SubscribeOutputCommand,
46 ) -> Result<SubscriptionClient<SubscribeOutputCommand>, CommandError> {
47 self.send(cmd)?;
48 Ok(SubscriptionClient {
49 client: self,
50 _cmd: std::marker::PhantomData,
51 })
52 }
53
54 pub(crate) fn into_inner(self) -> IoBoxed {
55 self.io
56 }
57
58 async fn recv<U: Command>(&self) -> Option<Result<U::Output, CommandError>> {
59 poll_fn(|cx| self.poll_recv::<U>(cx)).await
60 }
61
62 fn poll_recv<U: Command>(
63 &self,
64 cx: &mut Context<'_>,
65 ) -> Poll<Option<Result<U::Output, CommandError>>> {
66 match ready!(self.io.poll_recv(&Codec, cx)) {
67 Ok(item) => match item.into_result() {
68 Ok(result) => Poll::Ready(Some(U::to_output(result))),
69 Err(err) => Poll::Ready(Some(Err(CommandError::Error(err)))),
70 },
71 Err(RecvError::KeepAlive) | Err(RecvError::Stop) => {
72 unreachable!()
73 }
74 Err(RecvError::WriteBackpressure) => {
75 if let Err(err) = ready!(self.io.poll_flush(cx, false))
76 .map_err(|e| CommandError::Protocol(Error::PeerGone(Some(e))))
77 {
78 Poll::Ready(Some(Err(err)))
79 } else {
80 Poll::Pending
81 }
82 }
83 Err(RecvError::Decoder(err)) => Poll::Ready(Some(Err(CommandError::Protocol(err)))),
84 Err(RecvError::PeerGone(err)) => {
85 Poll::Ready(Some(Err(CommandError::Protocol(Error::PeerGone(err)))))
86 }
87 }
88 }
89}
90
91pub struct SubscriptionClient<U: Command + PubSubCommand> {
93 client: SimpleClient,
94 _cmd: std::marker::PhantomData<U>,
95}
96
97impl<U: Command + PubSubCommand> Stream for SubscriptionClient<U> {
98 type Item = Result<U::Output, CommandError>;
99
100 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
101 self.poll_recv(cx)
102 }
103}
104
105impl<U: Command + PubSubCommand> SubscriptionClient<U> {
106 pub fn into_client(self) -> SimpleClient {
130 self.client
131 }
132
133 pub fn send<T: Command + PubSubCommand>(&self, cmd: T) -> Result<(), CommandError> {
135 self.client.send(cmd)
136 }
137
138 pub async fn recv(&self) -> Option<Result<U::Output, CommandError>> {
140 poll_fn(|cx| self.client.poll_recv::<U>(cx)).await
141 }
142
143 pub fn poll_recv(
147 &self,
148 cx: &mut Context<'_>,
149 ) -> Poll<Option<Result<U::Output, CommandError>>> {
150 self.client.poll_recv::<U>(cx)
151 }
152}