1use byteorder::{BigEndian, WriteBytesExt};
2use log::debug;
3use std::future::Future;
4use std::pin::Pin;
5use std::task::{Context, Poll};
6use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
7
8#[derive(Debug, Clone, Eq, PartialEq)]
19pub struct Frame {
20 pub id: u64,
22 pub ok: u8,
24 pub data: Vec<u8>,
26}
27
28impl Frame {
29 pub fn new() -> Self {
30 Self {
31 id: 0,
32 ok: 0,
33 data: vec![],
34 }
35 }
36
37 pub async fn decode_from<R: AsyncRead + Unpin>(r: &mut R) -> std::io::Result<Self> {
39 let id = r.read_u64().await?;
40 debug!("decode id = {:?}", id);
41
42 let ok = r.read_u8().await?;
43 debug!("decode ok = {:?}", ok);
44
45 let len = r.read_u64().await?;
46 debug!("decode len = {:?}", len);
47 let mut datas = Vec::with_capacity(len as usize);
48 unsafe { datas.set_len(len as usize) }; r.read_exact(&mut datas).await?;
50 Ok(Frame {
51 id,
52 ok,
53 data: datas,
54 })
55 }
56
57 pub fn get_payload(&self) -> &[u8] {
60 &self.data
61 }
62
63 pub fn finish(self, id: u64) -> Vec<u8> {
65 let len = self.data.len() as u64;
66 let mut buf = Vec::with_capacity((17 + len) as usize);
67 let _ = WriteBytesExt::write_u64::<BigEndian>(&mut buf, id);
68 let _ = WriteBytesExt::write_u8(&mut buf, self.ok as u8);
69 let _ = WriteBytesExt::write_u64::<BigEndian>(&mut buf, len);
70 buf.extend(self.data);
71 buf
72 }
73}
74
75impl AsyncWrite for Frame {
76 fn poll_write(
77 mut self: Pin<&mut Self>,
78 cx: &mut Context<'_>,
79 buf: &[u8],
80 ) -> Poll<Result<usize, std::io::Error>> {
81 let mut p = Box::pin(AsyncWriteExt::write(&mut self.data, buf));
82 loop {
83 match Pin::new(&mut p).poll(cx) {
84 Poll::Ready(v) => match v {
85 Ok(v) => {
86 return Poll::Ready(Ok(v));
87 }
88 Err(e) => {
89 return Poll::Ready(Err(e));
90 }
91 },
92 Poll::Pending => {}
93 }
94 }
95 }
96
97 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
98 Poll::Ready(Ok(()))
99 }
100
101 fn poll_shutdown(
102 self: Pin<&mut Self>,
103 _cx: &mut Context<'_>,
104 ) -> Poll<Result<(), std::io::Error>> {
105 Poll::Ready(Ok(()))
106 }
107}