1use std::collections::VecDeque;
2use std::{cell::RefCell, fmt, future::poll_fn, rc::Rc, task::Poll};
3
4use ntex::io::{IoBoxed, IoRef, OnDisconnect, RecvError};
5use ntex::util::ready;
6use ntex::{channel::pool, service::Service, service::ServiceCtx};
7
8use super::cmd::Command;
9use super::codec::{Codec, Request, Response};
10use super::errors::{CommandError, Error};
11
12type Queue = Rc<RefCell<VecDeque<pool::Sender<Result<Response, Error>>>>>;
13
14#[derive(Clone)]
15pub struct Client {
17 io: IoRef,
18 queue: Queue,
19 disconnect: OnDisconnect,
20 pool: pool::Pool<Result<Response, Error>>,
21}
22
23impl Client {
24 pub(crate) fn new(io: IoBoxed) -> Self {
25 let queue: Queue = Rc::new(RefCell::new(VecDeque::new()));
26
27 let io_ref = io.get_ref();
29 let queue2 = queue.clone();
30 ntex::rt::spawn(async move {
31 poll_fn(|cx| loop {
32 match ready!(io.poll_recv(&Codec, cx)) {
33 Ok(item) => {
34 if let Some(tx) = queue2.borrow_mut().pop_front() {
35 let _ = tx.send(Ok(item));
36 } else {
37 log::error!("Unexpected redis response: {:?}", item);
38 }
39 continue;
40 }
41 Err(RecvError::KeepAlive) | Err(RecvError::Stop) => {
42 unreachable!()
43 }
44 Err(RecvError::WriteBackpressure) => {
45 if ready!(io.poll_flush(cx, false)).is_err() {
46 return Poll::Ready(());
47 } else {
48 continue;
49 }
50 }
51 Err(RecvError::Decoder(e)) => {
52 if let Some(tx) = queue2.borrow_mut().pop_front() {
53 let _ = tx.send(Err(e));
54 }
55 queue2.borrow_mut().clear();
56 let _ = ready!(io.poll_shutdown(cx));
57 return Poll::Ready(());
58 }
59 Err(RecvError::PeerGone(e)) => {
60 log::info!("Redis connection is dropped: {:?}", e);
61 queue2.borrow_mut().clear();
62 return Poll::Ready(());
63 }
64 }
65 })
66 .await
67 });
68
69 let disconnect = io_ref.on_disconnect();
70
71 Client {
72 queue,
73 disconnect,
74 io: io_ref,
75 pool: pool::new(),
76 }
77 }
78
79 pub async fn exec<T>(&self, cmd: T) -> Result<T::Output, CommandError>
81 where
82 T: Command,
83 {
84 if self.io.is_closed() {
85 Err(CommandError::Protocol(Error::PeerGone(None)))
86 } else {
87 self._call(cmd.to_request())
88 .await
89 .map_err(CommandError::Protocol)
90 .and_then(|res| T::to_output(res.into_result().map_err(CommandError::Error)?))
91 }
92 }
93
94 pub async fn flushdb(&self) -> Result<(), Error> {
96 self._call("FLUSHDB".into()).await?;
97 Ok(())
98 }
99
100 pub fn is_connected(&self) -> bool {
102 !self.io.is_closed()
103 }
104
105 async fn _call(&self, req: Request) -> Result<Response, Error> {
106 if let Err(e) = self.io.encode(req, &Codec) {
107 Err(e)
108 } else {
109 let (tx, rx) = self.pool.channel();
110 self.queue.borrow_mut().push_back(tx);
111 poll_fn(|cx| rx.poll_recv(cx))
112 .await
113 .map_err(|_| Error::PeerGone(None))
114 .and_then(|v| v)
115 }
116 }
117}
118
119impl Service<Request> for Client {
120 type Response = Response;
121 type Error = Error;
122
123 async fn ready(&self, _: ServiceCtx<'_, Self>) -> Result<(), Self::Error> {
124 poll_fn(|cx| {
125 if self.disconnect.poll_ready(cx).is_ready() {
126 Poll::Ready(Err(Error::PeerGone(None)))
127 } else {
128 Poll::Ready(Ok(()))
129 }
130 })
131 .await
132 }
133
134 async fn call(&self, req: Request, _: ServiceCtx<'_, Self>) -> Result<Response, Error> {
135 self._call(req).await
136 }
137}
138
139impl fmt::Debug for Client {
140 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
141 f.debug_struct("Client")
142 .field("connected", &!self.io.is_closed())
143 .finish()
144 }
145}