redis_asio/base/
connection.rs

1use tokio_codec::Decoder;
2use tokio_tcp::TcpStream;
3use futures::{Future, Stream, Sink, Async, try_ready};
4use crate::{RedisValue, RedisCommand, RespInternalValue, RedisCodec, RedisError, RedisErrorKind};
5use std::net::SocketAddr;
6use core::marker::Send as SendMarker;
7use std::error::Error;
8
9
10/// Actual Redis connection converts packets from `RESP` packets into `RedisValue`
11/// and from `RedisCommand` into `RESP` packets.
12///
13/// # Example
14/// ```rust,no_run
15/// use std::net::SocketAddr;
16/// use futures::Future;
17/// use redis_asio::{RedisCoreConnection, RedisValue, command, from_redis_value};
18///
19/// let address = &"127.0.0.1:6379".parse::<SocketAddr>().unwrap();
20///
21/// let set_req = command("SET").arg("foo").arg(123);
22/// let get_req = command("GET").arg("foo");
23///
24/// let future = RedisCoreConnection::connect(address)
25///     .and_then(move |con| con.send(set_req))
26///     .and_then(|(con, response)| {
27///         assert_eq!(RedisValue::Ok, response, "Expect Ok");
28///         con.send(get_req)
29///     })
30///     .map(move |(_, response)|
31///         assert_eq!(123, from_redis_value(&response).unwrap()))
32///     .map_err(|_| unreachable!());
33///  tokio::run(future);
34/// ```
35pub struct RedisCoreConnection {
36    pub(crate) sender: Box<dyn Sink<SinkItem=RedisCommand, SinkError=RedisError> + SendMarker + 'static>,
37    pub(crate) receiver: Box<dyn Stream<Item=RespInternalValue, Error=RedisError> + SendMarker + 'static>,
38}
39
40impl RedisCoreConnection {
41    /// Open a connection to Redis server and wrap it into `RedisCoreConnection`,
42    /// that will be available in the future.
43    pub fn connect(addr: &SocketAddr) -> impl Future<Item=Self, Error=RedisError> {
44        TcpStream::connect(addr)
45            .map_err(|err| RedisError::new(RedisErrorKind::ConnectionError, err.description().to_string()))
46            .map(|stream| {
47                let codec = RedisCodec;
48                let (tx, rx) = codec.framed(stream).split();
49                Self::new(tx, rx)
50            })
51    }
52
53    pub(crate) fn new<S, R>(sender: S, receiver: R) -> RedisCoreConnection
54        where S: Sink<SinkItem=RedisCommand, SinkError=RedisError> + SendMarker + 'static,
55              R: Stream<Item=RespInternalValue, Error=RedisError> + SendMarker + 'static {
56        let sender = Box::new(sender);
57        let receiver = Box::new(receiver);
58        RedisCoreConnection { sender, receiver }
59    }
60
61    /// Send request as a `RedisCommand` and return `Send` represents the future
62    /// `Future<Item=(RedisCoreConnection, RedisValue), Error=RedisError>`
63    pub fn send(self, req: RedisCommand) -> Send {
64        Send::new(self, req)
65    }
66}
67
68/// The `Future<Item=(RedisCoreConnection, RedisValue), Error=RedisError>` wrapper
69pub struct Send {
70    sender: Option<Box<dyn Sink<SinkItem=RedisCommand, SinkError=RedisError> + SendMarker + 'static>>,
71    receiver: Option<Box<dyn Stream<Item=RespInternalValue, Error=RedisError> + SendMarker + 'static>>,
72    request: Option<RedisCommand>,
73    is_sent: bool,
74}
75
76impl Send {
77    fn new(inner: RedisCoreConnection, request: RedisCommand) -> Send {
78        let sender = Some(inner.sender);
79        let receiver = Some(inner.receiver);
80        let request = Some(request);
81        let is_sent = false;
82        Send { sender, receiver, request, is_sent }
83    }
84}
85
86impl Future for Send {
87    type Item = (RedisCoreConnection, RedisValue);
88    type Error = RedisError;
89
90    fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> {
91        let sender = self.sender.as_mut().unwrap();
92        let receiver = self.receiver.as_mut().unwrap();
93
94        if let Some(req) = self.request.take() {
95            if sender.start_send(req)?.is_not_ready() {
96                return Ok(Async::NotReady);
97            }
98        }
99
100        if !self.is_sent {
101            try_ready!(sender.poll_complete());
102            self.is_sent = true;
103        }
104
105        // Request is sent already, lets read from receiver
106
107        match try_ready!(receiver.poll()) {
108            Some(response) => {
109                let redis_response = response.into_redis_value()?;
110                let con =
111                    RedisCoreConnection::new(self.sender.take().unwrap(), self.receiver.take().unwrap());
112                Ok(Async::Ready((con, redis_response)))
113            }
114            _ => Err(RedisError::new(RedisErrorKind::ConnectionError,
115                                     "Connection has closed before an answer came".to_string()))
116        }
117    }
118}