ntex_redis/
simple.rs

1use std::pin::Pin;
2use std::{future::poll_fn, task::Context, task::Poll};
3
4use super::cmd::{commands::PubSubCommand, commands::SubscribeOutputCommand, Command};
5use super::codec::Codec;
6use super::errors::{CommandError, Error};
7use ntex::{io::IoBoxed, io::RecvError, util::ready, util::Stream};
8
9/// Redis client
10pub struct SimpleClient {
11    io: IoBoxed,
12}
13
14impl SimpleClient {
15    /// Create new simple client
16    pub(crate) fn new(io: IoBoxed) -> Self {
17        SimpleClient { io }
18    }
19
20    /// Execute redis command and wait result
21    pub async fn exec<U>(&self, cmd: U) -> Result<U::Output, CommandError>
22    where
23        U: Command,
24    {
25        self.send(cmd)?;
26        loop {
27            if let Some(result) = self.recv::<U>().await {
28                return result;
29            }
30        }
31    }
32
33    /// Send redis command
34    pub fn send<U>(&self, cmd: U) -> Result<(), CommandError>
35    where
36        U: Command,
37    {
38        self.io.encode(cmd.to_request(), &Codec)?;
39        Ok(())
40    }
41
42    /// Execute redis SUBSCRIBE command and act with output as stream
43    pub fn subscribe(
44        self,
45        cmd: SubscribeOutputCommand,
46    ) -> Result<SubscriptionClient<SubscribeOutputCommand>, CommandError> {
47        self.send(cmd)?;
48        Ok(SubscriptionClient {
49            client: self,
50            _cmd: std::marker::PhantomData,
51        })
52    }
53
54    pub(crate) fn into_inner(self) -> IoBoxed {
55        self.io
56    }
57
58    async fn recv<U: Command>(&self) -> Option<Result<U::Output, CommandError>> {
59        poll_fn(|cx| self.poll_recv::<U>(cx)).await
60    }
61
62    fn poll_recv<U: Command>(
63        &self,
64        cx: &mut Context<'_>,
65    ) -> Poll<Option<Result<U::Output, CommandError>>> {
66        match ready!(self.io.poll_recv(&Codec, cx)) {
67            Ok(item) => match item.into_result() {
68                Ok(result) => Poll::Ready(Some(U::to_output(result))),
69                Err(err) => Poll::Ready(Some(Err(CommandError::Error(err)))),
70            },
71            Err(RecvError::KeepAlive) | Err(RecvError::Stop) => {
72                unreachable!()
73            }
74            Err(RecvError::WriteBackpressure) => {
75                if let Err(err) = ready!(self.io.poll_flush(cx, false))
76                    .map_err(|e| CommandError::Protocol(Error::PeerGone(Some(e))))
77                {
78                    Poll::Ready(Some(Err(err)))
79                } else {
80                    Poll::Pending
81                }
82            }
83            Err(RecvError::Decoder(err)) => Poll::Ready(Some(Err(CommandError::Protocol(err)))),
84            Err(RecvError::PeerGone(err)) => {
85                Poll::Ready(Some(Err(CommandError::Protocol(Error::PeerGone(err)))))
86            }
87        }
88    }
89}
90
91/// Redis pubsub client to receive push messages
92pub struct SubscriptionClient<U: Command + PubSubCommand> {
93    client: SimpleClient,
94    _cmd: std::marker::PhantomData<U>,
95}
96
97impl<U: Command + PubSubCommand> Stream for SubscriptionClient<U> {
98    type Item = Result<U::Output, CommandError>;
99
100    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
101        self.poll_recv(cx)
102    }
103}
104
105impl<U: Command + PubSubCommand> SubscriptionClient<U> {
106    /// Get client back. Don't forget reset connection!
107    ///
108    /// ```rust
109    /// use ntex_redis::{cmd, RedisConnector};
110    ///
111    /// #[ntex::main]
112    /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
113    ///     let redis = RedisConnector::new("127.0.0.1:6379").connect_simple().await?;
114    ///    
115    ///     let subscriber = redis.subscribe(cmd::Subscribe(vec!["test"]))?;
116    ///     // do some work
117    ///
118    ///     // go back to normal client
119    ///     let redis = subscriber.into_client();
120    ///
121    ///     // and reset connection, client may receive pending subscription messages instead of valid RESET response
122    ///     if let Err(e) = redis.exec(cmd::Reset()).await {
123    ///         println!("Error on reset connection: {}", e);      
124    ///     };
125    ///
126    ///     Ok(())
127    /// }
128    /// ```
129    pub fn into_client(self) -> SimpleClient {
130        self.client
131    }
132
133    /// Send redis subscribe/unsubscribe command
134    pub fn send<T: Command + PubSubCommand>(&self, cmd: T) -> Result<(), CommandError> {
135        self.client.send(cmd)
136    }
137
138    /// Attempt to pull out the next value of this stream.
139    pub async fn recv(&self) -> Option<Result<U::Output, CommandError>> {
140        poll_fn(|cx| self.client.poll_recv::<U>(cx)).await
141    }
142
143    /// Attempt to pull out the next value of this stream, registering
144    /// the current task for wakeup if the value is not yet available,
145    /// and returning None if the payload is exhausted.
146    pub fn poll_recv(
147        &self,
148        cx: &mut Context<'_>,
149    ) -> Poll<Option<Result<U::Output, CommandError>>> {
150        self.client.poll_recv::<U>(cx)
151    }
152}