ntex_redis/
client.rs

1use std::collections::VecDeque;
2use std::{cell::RefCell, fmt, future::poll_fn, rc::Rc, task::Poll};
3
4use ntex::io::{IoBoxed, IoRef, OnDisconnect, RecvError};
5use ntex::util::ready;
6use ntex::{channel::pool, service::Service, service::ServiceCtx};
7
8use super::cmd::Command;
9use super::codec::{Codec, Request, Response};
10use super::errors::{CommandError, Error};
11
12type Queue = Rc<RefCell<VecDeque<pool::Sender<Result<Response, Error>>>>>;
13
14#[derive(Clone)]
15/// Shared redis client
16pub struct Client {
17    io: IoRef,
18    queue: Queue,
19    disconnect: OnDisconnect,
20    pool: pool::Pool<Result<Response, Error>>,
21}
22
23impl Client {
24    pub(crate) fn new(io: IoBoxed) -> Self {
25        let queue: Queue = Rc::new(RefCell::new(VecDeque::new()));
26
27        // read redis response task
28        let io_ref = io.get_ref();
29        let queue2 = queue.clone();
30        ntex::rt::spawn(async move {
31            poll_fn(|cx| loop {
32                match ready!(io.poll_recv(&Codec, cx)) {
33                    Ok(item) => {
34                        if let Some(tx) = queue2.borrow_mut().pop_front() {
35                            let _ = tx.send(Ok(item));
36                        } else {
37                            log::error!("Unexpected redis response: {:?}", item);
38                        }
39                        continue;
40                    }
41                    Err(RecvError::KeepAlive) | Err(RecvError::Stop) => {
42                        unreachable!()
43                    }
44                    Err(RecvError::WriteBackpressure) => {
45                        if ready!(io.poll_flush(cx, false)).is_err() {
46                            return Poll::Ready(());
47                        } else {
48                            continue;
49                        }
50                    }
51                    Err(RecvError::Decoder(e)) => {
52                        if let Some(tx) = queue2.borrow_mut().pop_front() {
53                            let _ = tx.send(Err(e));
54                        }
55                        queue2.borrow_mut().clear();
56                        let _ = ready!(io.poll_shutdown(cx));
57                        return Poll::Ready(());
58                    }
59                    Err(RecvError::PeerGone(e)) => {
60                        log::info!("Redis connection is dropped: {:?}", e);
61                        queue2.borrow_mut().clear();
62                        return Poll::Ready(());
63                    }
64                }
65            })
66            .await
67        });
68
69        let disconnect = io_ref.on_disconnect();
70
71        Client {
72            queue,
73            disconnect,
74            io: io_ref,
75            pool: pool::new(),
76        }
77    }
78
79    /// Execute redis command
80    pub async fn exec<T>(&self, cmd: T) -> Result<T::Output, CommandError>
81    where
82        T: Command,
83    {
84        if self.io.is_closed() {
85            Err(CommandError::Protocol(Error::PeerGone(None)))
86        } else {
87            self._call(cmd.to_request())
88                .await
89                .map_err(CommandError::Protocol)
90                .and_then(|res| T::to_output(res.into_result().map_err(CommandError::Error)?))
91        }
92    }
93
94    /// Delete all the keys of the currently selected DB.
95    pub async fn flushdb(&self) -> Result<(), Error> {
96        self._call("FLUSHDB".into()).await?;
97        Ok(())
98    }
99
100    /// Returns true if underlying transport is connected to redis
101    pub fn is_connected(&self) -> bool {
102        !self.io.is_closed()
103    }
104
105    async fn _call(&self, req: Request) -> Result<Response, Error> {
106        if let Err(e) = self.io.encode(req, &Codec) {
107            Err(e)
108        } else {
109            let (tx, rx) = self.pool.channel();
110            self.queue.borrow_mut().push_back(tx);
111            poll_fn(|cx| rx.poll_recv(cx))
112                .await
113                .map_err(|_| Error::PeerGone(None))
114                .and_then(|v| v)
115        }
116    }
117}
118
119impl Service<Request> for Client {
120    type Response = Response;
121    type Error = Error;
122
123    async fn ready(&self, _: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
124        poll_fn(|cx| {
125            if self.disconnect.poll_ready(cx).is_ready() {
126                Poll::Ready(Err(Error::PeerGone(None)))
127            } else {
128                Poll::Ready(Ok(()))
129            }
130        })
131        .await
132    }
133
134    async fn call(&self, req: Request, _: ServiceCtx<'_, Self>) -> Result<Response, Error> {
135        self._call(req).await
136    }
137}
138
139impl fmt::Debug for Client {
140    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
141        f.debug_struct("Client")
142            .field("connected", &!self.io.is_closed())
143            .finish()
144    }
145}