1use compio::buf::{IoBuf, IoBufMut};
2use compio::io::framed::codec::{Decoder, Encoder};
3
4use crate::error::RpcError;
5use crate::protocol::*;
6
7pub struct ClientCodec;
8
9impl<B: IoBufMut> Encoder<Request, B> for ClientCodec {
12 type Error = RpcError;
13
14 fn encode(&mut self, item: Request, buf: &mut B) -> Result<(), Self::Error> {
15 let mut tmp = Vec::new();
16 tmp.push(item.op as u8);
17 tmp.extend_from_slice(&item.hashname.to_be_bytes());
18 encode_request_payload(&mut tmp, item.payload);
19 buf.extend_from_slice(&tmp)
20 .map_err(|e| RpcError::Other(e.to_string()))?;
21 Ok(())
22 }
23}
24
25impl<B: IoBuf> Decoder<Response, B> for ClientCodec {
31 type Error = RpcError;
32
33 fn decode(&mut self, buf: &compio::buf::Slice<B>) -> Result<Response, Self::Error> {
34 let bytes: &[u8] = buf;
35 let mut pos = 0;
36 let status = read_u8(bytes, &mut pos)?;
37 match status {
38 0x00 => Ok(Response::Ok(bytes[pos..].to_vec())),
39 0x01 => {
40 let code = read_u16_be(bytes, &mut pos)?;
41 let msg_len = read_u16_be(bytes, &mut pos)? as usize;
42 let message = String::from_utf8_lossy(&bytes[pos..pos + msg_len]).into_owned();
43 Ok(Response::Err { code, message })
44 }
45 _ => Err(RpcError::Protocol(
46 "invalid response status byte".to_string(),
47 )),
48 }
49 }
50}
51
52fn encode_request_payload(buf: &mut Vec<u8>, payload: RequestPayload) {
53 match payload {
54 RequestPayload::Key(key) => {
55 write_bytes(buf, &key);
56 }
57 RequestPayload::Empty => {}
58 RequestPayload::Range { start, end } => {
59 write_bound(buf, &start);
60 write_bound(buf, &end);
61 }
62 RequestPayload::Upsert { key, flag, value } => {
63 write_upsert_key(buf, &key);
64 let flag_byte = match flag {
65 None => 0u8,
66 Some(true) => 1u8,
67 Some(false) => 2u8,
68 };
69 buf.push(flag_byte);
70 write_bytes(buf, &value);
71 }
72 RequestPayload::Remove { key, soft } => {
73 write_bytes(buf, &key);
74 buf.push(soft as u8);
75 }
76 RequestPayload::Take { key, soft } => {
77 write_bytes(buf, &key);
78 buf.push(soft as u8);
79 }
80 RequestPayload::Count { exact } => {
81 buf.push(exact as u8);
82 }
83 RequestPayload::ListCollections => {}
84 RequestPayload::Batch(items) => {
85 buf.extend_from_slice(&(items.len() as u32).to_be_bytes());
86 for (key, val) in items {
87 write_bytes(buf, &key);
88 match val {
89 None => buf.push(0),
90 Some(v) => {
91 buf.push(1);
92 write_bytes(buf, &v);
93 }
94 }
95 }
96 }
97 }
98}