redis_asio/base/
connection.rs1use tokio_codec::Decoder;
2use tokio_tcp::TcpStream;
3use futures::{Future, Stream, Sink, Async, try_ready};
4use crate::{RedisValue, RedisCommand, RespInternalValue, RedisCodec, RedisError, RedisErrorKind};
5use std::net::SocketAddr;
6use core::marker::Send as SendMarker;
7use std::error::Error;
8
9
10pub struct RedisCoreConnection {
36 pub(crate) sender: Box<dyn Sink<SinkItem=RedisCommand, SinkError=RedisError> + SendMarker + 'static>,
37 pub(crate) receiver: Box<dyn Stream<Item=RespInternalValue, Error=RedisError> + SendMarker + 'static>,
38}
39
40impl RedisCoreConnection {
41 pub fn connect(addr: &SocketAddr) -> impl Future<Item=Self, Error=RedisError> {
44 TcpStream::connect(addr)
45 .map_err(|err| RedisError::new(RedisErrorKind::ConnectionError, err.description().to_string()))
46 .map(|stream| {
47 let codec = RedisCodec;
48 let (tx, rx) = codec.framed(stream).split();
49 Self::new(tx, rx)
50 })
51 }
52
53 pub(crate) fn new<S, R>(sender: S, receiver: R) -> RedisCoreConnection
54 where S: Sink<SinkItem=RedisCommand, SinkError=RedisError> + SendMarker + 'static,
55 R: Stream<Item=RespInternalValue, Error=RedisError> + SendMarker + 'static {
56 let sender = Box::new(sender);
57 let receiver = Box::new(receiver);
58 RedisCoreConnection { sender, receiver }
59 }
60
61 pub fn send(self, req: RedisCommand) -> Send {
64 Send::new(self, req)
65 }
66}
67
68pub struct Send {
70 sender: Option<Box<dyn Sink<SinkItem=RedisCommand, SinkError=RedisError> + SendMarker + 'static>>,
71 receiver: Option<Box<dyn Stream<Item=RespInternalValue, Error=RedisError> + SendMarker + 'static>>,
72 request: Option<RedisCommand>,
73 is_sent: bool,
74}
75
76impl Send {
77 fn new(inner: RedisCoreConnection, request: RedisCommand) -> Send {
78 let sender = Some(inner.sender);
79 let receiver = Some(inner.receiver);
80 let request = Some(request);
81 let is_sent = false;
82 Send { sender, receiver, request, is_sent }
83 }
84}
85
86impl Future for Send {
87 type Item = (RedisCoreConnection, RedisValue);
88 type Error = RedisError;
89
90 fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> {
91 let sender = self.sender.as_mut().unwrap();
92 let receiver = self.receiver.as_mut().unwrap();
93
94 if let Some(req) = self.request.take() {
95 if sender.start_send(req)?.is_not_ready() {
96 return Ok(Async::NotReady);
97 }
98 }
99
100 if !self.is_sent {
101 try_ready!(sender.poll_complete());
102 self.is_sent = true;
103 }
104
105 match try_ready!(receiver.poll()) {
108 Some(response) => {
109 let redis_response = response.into_redis_value()?;
110 let con =
111 RedisCoreConnection::new(self.sender.take().unwrap(), self.receiver.take().unwrap());
112 Ok(Async::Ready((con, redis_response)))
113 }
114 _ => Err(RedisError::new(RedisErrorKind::ConnectionError,
115 "Connection has closed before an answer came".to_string()))
116 }
117 }
118}