Skip to main content

armour_rpc/
codec.rs

1use compio::buf::{IoBuf, IoBufMut};
2use compio::io::framed::codec::{Decoder, Encoder};
3
4use crate::error::RpcError;
5use crate::protocol::*;
6
7pub struct ClientCodec;
8
9/// Encodes a Request into the wire format expected by the server:
10/// `[op: u8][hashname: u64 BE][payload...]`
11impl<B: IoBufMut> Encoder<Request, B> for ClientCodec {
12    type Error = RpcError;
13
14    fn encode(&mut self, item: Request, buf: &mut B) -> Result<(), Self::Error> {
15        let mut tmp = Vec::new();
16        tmp.push(item.op as u8);
17        tmp.extend_from_slice(&item.hashname.to_be_bytes());
18        encode_request_payload(&mut tmp, item.payload);
19        buf.extend_from_slice(&tmp)
20            .map_err(|e| RpcError::Other(e.to_string()))?;
21        Ok(())
22    }
23}
24
25/// Decodes a Response from the wire format sent by the server:
26/// `[status: u8][payload...]`
27///
28/// The `Ok` variant contains the raw payload bytes; the caller decodes
29/// them according to which operation was sent.
30impl<B: IoBuf> Decoder<Response, B> for ClientCodec {
31    type Error = RpcError;
32
33    fn decode(&mut self, buf: &compio::buf::Slice<B>) -> Result<Response, Self::Error> {
34        let bytes: &[u8] = buf;
35        let mut pos = 0;
36        let status = read_u8(bytes, &mut pos)?;
37        match status {
38            0x00 => Ok(Response::Ok(bytes[pos..].to_vec())),
39            0x01 => {
40                let code = read_u16_be(bytes, &mut pos)?;
41                let msg_len = read_u16_be(bytes, &mut pos)? as usize;
42                let message = String::from_utf8_lossy(&bytes[pos..pos + msg_len]).into_owned();
43                Ok(Response::Err { code, message })
44            }
45            _ => Err(RpcError::Protocol(
46                "invalid response status byte".to_string(),
47            )),
48        }
49    }
50}
51
52fn encode_request_payload(buf: &mut Vec<u8>, payload: RequestPayload) {
53    match payload {
54        RequestPayload::Key(key) => {
55            write_bytes(buf, &key);
56        }
57        RequestPayload::Empty => {}
58        RequestPayload::Range { start, end } => {
59            write_bound(buf, &start);
60            write_bound(buf, &end);
61        }
62        RequestPayload::Upsert { key, flag, value } => {
63            write_upsert_key(buf, &key);
64            let flag_byte = match flag {
65                None => 0u8,
66                Some(true) => 1u8,
67                Some(false) => 2u8,
68            };
69            buf.push(flag_byte);
70            write_bytes(buf, &value);
71        }
72        RequestPayload::Remove { key, soft } => {
73            write_bytes(buf, &key);
74            buf.push(soft as u8);
75        }
76        RequestPayload::Take { key, soft } => {
77            write_bytes(buf, &key);
78            buf.push(soft as u8);
79        }
80        RequestPayload::Count { exact } => {
81            buf.push(exact as u8);
82        }
83        RequestPayload::ListCollections => {}
84        RequestPayload::Batch(items) => {
85            buf.extend_from_slice(&(items.len() as u32).to_be_bytes());
86            for (key, val) in items {
87                write_bytes(buf, &key);
88                match val {
89                    None => buf.push(0),
90                    Some(v) => {
91                        buf.push(1);
92                        write_bytes(buf, &v);
93                    }
94                }
95            }
96        }
97    }
98}