cogo_redis/
simple.rs

1use std::cell::RefCell;
2use std::io;
3use std::io::{Read, Write};
4use cogo::net::TcpStream;
5use crate::bytes::{BufMut, BytesMut, ByteString};
6use crate::codec::{Decoder, Encoder};
7use crate::codec_redis::{Codec, Request, Response};
8use crate::errors::Error;
9
10use super::cmd::Command;
11use super::errors::{CommandError};
12
13/// Redis client
14pub struct SimpleClient {
15    pub codec: Codec,
16    pub io: RefCell<Option<TcpStream>>,
17}
18
19unsafe impl Send for SimpleClient {}
20
21unsafe impl Sync for SimpleClient {}
22
23impl SimpleClient {
24    /// Create new simple client
25    pub fn new(io: TcpStream) -> Self {
26        SimpleClient { codec: Codec {}, io: RefCell::new(Some(io)) }
27    }
28
29    /// Execute redis command
30    pub fn exec<U>(&self, cmd: U) -> Result<U::Output, CommandError>
31        where
32            U: Command,
33    {
34        let buf = self.encode(cmd)?;
35        let resp = self.send(&buf)?;
36        self.decode::<U>(resp)
37    }
38
39    pub fn encode<U: Command>(&self, cmd: U) -> Result<BytesMut, Error> {
40        let mut buf_in = BytesMut::new();
41        let mut req = cmd.to_request();
42        self.codec.encode(req, &mut buf_in)?;
43        Ok(buf_in)
44    }
45
46    pub fn encode_req(&self, req: Request, buf: &mut BytesMut) -> Result<(), Error> {
47        self.codec.encode(req, buf)?;
48        Ok(())
49    }
50
51    pub fn send(&self, arg: &BytesMut) -> Result<Response, CommandError> {
52        let mut io = self.io.borrow_mut();
53        if io.is_none() {
54            return Err(CommandError::Protocol(Error::PeerGone(None)));
55        }
56        let io = io.as_mut().unwrap();
57        io.write_all(arg)?;
58        io.flush();
59        let mut buffer = BytesMut::with_capacity(64);
60        loop {
61            let mut buf = BytesMut::with_capacity(64);
62            buf.put(&[0; 64][..]);
63            io.read(&mut buf)?;
64            buffer.extend(buf);
65            match self.codec.decode(&mut buffer)? {
66                None => {
67                    continue;
68                }
69                Some(item) => {
70                    return Ok(item);
71                }
72            }
73        }
74    }
75
76    pub fn decode<U>(&self, resp: Response) -> Result<U::Output, CommandError>
77        where
78            U: Command, {
79        return U::to_output(resp.into_result().map_err(CommandError::Error)?);
80    }
81
82    pub fn is_closed(&self) -> bool {
83        self.io.borrow().is_none()
84    }
85}
86
87
88
89