drpc/
frame.rs

1use byteorder::{BigEndian, WriteBytesExt};
2use log::debug;
3use std::future::Future;
4use std::pin::Pin;
5use std::task::{Context, Poll};
6use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
7
8// Frame layout
9// id(u64) + ok(u8) + len(u64) + payload([u8; len])
10
11// req frame layout
12// id(u64) + ok(u8) + len(u64) + payload([u8; len])
13
14// rsp frame layout(ok=0,payload is string,ok=1,payload is data)
15// id(u64) + ok(u8) + len(u64) + payload/string ([u8; len])
16
17/// raw frame wrapper, low level protocol
18#[derive(Debug, Clone, Eq, PartialEq)]
19pub struct Frame {
20    /// frame id, req and rsp has the same id
21    pub id: u64,
22    /// is ok,false=0, true = 1
23    pub ok: u8,
24    /// payload data
25    pub data: Vec<u8>,
26}
27
28impl Frame {
29    pub fn new() -> Self {
30        Self {
31            id: 0,
32            ok: 0,
33            data: vec![],
34        }
35    }
36
37    /// Decode a frame from the reader.
38    pub async fn decode_from<R: AsyncRead + Unpin>(r: &mut R) -> std::io::Result<Self> {
39        let id = r.read_u64().await?;
40        debug!("decode id = {:?}", id);
41
42        let ok = r.read_u8().await?;
43        debug!("decode ok = {:?}", ok);
44
45        let len = r.read_u64().await?;
46        debug!("decode len = {:?}", len);
47        let mut datas = Vec::with_capacity(len as usize);
48        unsafe { datas.set_len(len as usize) }; // it's safety,avoid one memset
49        r.read_exact(&mut datas).await?;
50        Ok(Frame {
51            id,
52            ok,
53            data: datas,
54        })
55    }
56
57    /// Decode a request/response from the frame. This would return the request raw buffer.
58    /// You need to deserialized from it into the real type.
59    pub fn get_payload(&self) -> &[u8] {
60        &self.data
61    }
62
63    /// Convert self into raw buf that can be send as a frame
64    pub fn finish(self, id: u64) -> Vec<u8> {
65        let len = self.data.len() as u64;
66        let mut buf = Vec::with_capacity((17 + len) as usize);
67        let _ = WriteBytesExt::write_u64::<BigEndian>(&mut buf, id);
68        let _ = WriteBytesExt::write_u8(&mut buf, self.ok as u8);
69        let _ = WriteBytesExt::write_u64::<BigEndian>(&mut buf, len);
70        buf.extend(self.data);
71        buf
72    }
73}
74
75impl AsyncWrite for Frame {
76    fn poll_write(
77        mut self: Pin<&mut Self>,
78        cx: &mut Context<'_>,
79        buf: &[u8],
80    ) -> Poll<Result<usize, std::io::Error>> {
81        let mut p = Box::pin(AsyncWriteExt::write(&mut self.data, buf));
82        loop {
83            match Pin::new(&mut p).poll(cx) {
84                Poll::Ready(v) => match v {
85                    Ok(v) => {
86                        return Poll::Ready(Ok(v));
87                    }
88                    Err(e) => {
89                        return Poll::Ready(Err(e));
90                    }
91                },
92                Poll::Pending => {}
93            }
94        }
95    }
96
97    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
98        Poll::Ready(Ok(()))
99    }
100
101    fn poll_shutdown(
102        self: Pin<&mut Self>,
103        _cx: &mut Context<'_>,
104    ) -> Poll<Result<(), std::io::Error>> {
105        Poll::Ready(Ok(()))
106    }
107}