1use std::cell::RefCell;
2use std::io;
3use std::io::{Read, Write};
4use cogo::net::TcpStream;
5use crate::bytes::{BufMut, BytesMut, ByteString};
6use crate::codec::{Decoder, Encoder};
7use crate::codec_redis::{Codec, Request, Response};
8use crate::errors::Error;
9
10use super::cmd::Command;
11use super::errors::{CommandError};
12
13pub struct SimpleClient {
15 pub codec: Codec,
16 pub io: RefCell<Option<TcpStream>>,
17}
18
19unsafe impl Send for SimpleClient {}
20
21unsafe impl Sync for SimpleClient {}
22
23impl SimpleClient {
24 pub fn new(io: TcpStream) -> Self {
26 SimpleClient { codec: Codec {}, io: RefCell::new(Some(io)) }
27 }
28
29 pub fn exec<U>(&self, cmd: U) -> Result<U::Output, CommandError>
31 where
32 U: Command,
33 {
34 let buf = self.encode(cmd)?;
35 let resp = self.send(&buf)?;
36 self.decode::<U>(resp)
37 }
38
39 pub fn encode<U: Command>(&self, cmd: U) -> Result<BytesMut, Error> {
40 let mut buf_in = BytesMut::new();
41 let mut req = cmd.to_request();
42 self.codec.encode(req, &mut buf_in)?;
43 Ok(buf_in)
44 }
45
46 pub fn encode_req(&self, req: Request, buf: &mut BytesMut) -> Result<(), Error> {
47 self.codec.encode(req, buf)?;
48 Ok(())
49 }
50
51 pub fn send(&self, arg: &BytesMut) -> Result<Response, CommandError> {
52 let mut io = self.io.borrow_mut();
53 if io.is_none() {
54 return Err(CommandError::Protocol(Error::PeerGone(None)));
55 }
56 let io = io.as_mut().unwrap();
57 io.write_all(arg)?;
58 io.flush();
59 let mut buffer = BytesMut::with_capacity(64);
60 loop {
61 let mut buf = BytesMut::with_capacity(64);
62 buf.put(&[0; 64][..]);
63 io.read(&mut buf)?;
64 buffer.extend(buf);
65 match self.codec.decode(&mut buffer)? {
66 None => {
67 continue;
68 }
69 Some(item) => {
70 return Ok(item);
71 }
72 }
73 }
74 }
75
76 pub fn decode<U>(&self, resp: Response) -> Result<U::Output, CommandError>
77 where
78 U: Command, {
79 return U::to_output(resp.into_result().map_err(CommandError::Error)?);
80 }
81
82 pub fn is_closed(&self) -> bool {
83 self.io.borrow().is_none()
84 }
85}
86
87
88
89