1use bytes::{Bytes, BytesMut};
7use tokio_util::codec::{Decoder, Encoder, LengthDelimitedCodec};
8
9use crate::error::{Error, MAX_FRAME_SIZE, Result};
10use crate::message::RpcMessage;
11
12pub struct FrameCodec {
20 inner: LengthDelimitedCodec,
21}
22
23impl FrameCodec {
24 pub fn new() -> Self {
27 let inner = LengthDelimitedCodec::builder()
28 .length_field_offset(0)
29 .length_field_length(4)
30 .length_adjustment(0)
31 .big_endian()
32 .max_frame_length(MAX_FRAME_SIZE)
33 .new_codec();
34 Self { inner }
35 }
36}
37
38impl Default for FrameCodec {
39 fn default() -> Self {
40 Self::new()
41 }
42}
43
44impl Decoder for FrameCodec {
45 type Item = RpcMessage;
46 type Error = Error;
47
48 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<RpcMessage>> {
49 let frame: Option<BytesMut> = self.inner.decode(src).map_err(Error::from)?;
50 match frame {
51 Some(payload) => {
52 let msg: RpcMessage = rmp_serde::from_slice(&payload)?;
53 Ok(Some(msg))
54 },
55 None => Ok(None),
56 }
57 }
58}
59
60impl Encoder<RpcMessage> for FrameCodec {
61 type Error = Error;
62
63 fn encode(&mut self, item: RpcMessage, dst: &mut BytesMut) -> Result<()> {
64 let payload: Vec<u8> = rmp_serde::to_vec(&item)?;
65 let bytes = Bytes::from(payload);
66 self.inner.encode(bytes, dst).map_err(Error::from)?;
67 Ok(())
68 }
69}