Skip to main content

folk_protocol/
codec.rs

1//! Tokio codec for length-prefixed `MessagePack`-RPC frames.
2//!
3//! Wraps `tokio_util::codec::LengthDelimitedCodec` for the framing layer and
4//! delegates payload encoding/decoding to `rmp_serde`.
5
6use bytes::{Bytes, BytesMut};
7use tokio_util::codec::{Decoder, Encoder, LengthDelimitedCodec};
8
9use crate::error::{Error, MAX_FRAME_SIZE, Result};
10use crate::message::RpcMessage;
11
12/// Tokio codec for [`RpcMessage`] over a length-prefixed stream.
13///
14/// Frame layout: `[4-byte BE length][msgpack payload]`. Maximum frame size
15/// is [`MAX_FRAME_SIZE`].
16///
17/// Use this for both the task channel and the control channel — they share
18/// the same wire format.
19pub struct FrameCodec {
20    inner: LengthDelimitedCodec,
21}
22
23impl FrameCodec {
24    /// Create a new codec with the standard 4-byte BE length prefix and
25    /// 16 MiB max frame size.
26    pub fn new() -> Self {
27        let inner = LengthDelimitedCodec::builder()
28            .length_field_offset(0)
29            .length_field_length(4)
30            .length_adjustment(0)
31            .big_endian()
32            .max_frame_length(MAX_FRAME_SIZE)
33            .new_codec();
34        Self { inner }
35    }
36}
37
38impl Default for FrameCodec {
39    fn default() -> Self {
40        Self::new()
41    }
42}
43
44impl Decoder for FrameCodec {
45    type Item = RpcMessage;
46    type Error = Error;
47
48    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<RpcMessage>> {
49        let frame: Option<BytesMut> = self.inner.decode(src).map_err(Error::from)?;
50        match frame {
51            Some(payload) => {
52                let msg: RpcMessage = rmp_serde::from_slice(&payload)?;
53                Ok(Some(msg))
54            },
55            None => Ok(None),
56        }
57    }
58}
59
60impl Encoder<RpcMessage> for FrameCodec {
61    type Error = Error;
62
63    fn encode(&mut self, item: RpcMessage, dst: &mut BytesMut) -> Result<()> {
64        let payload: Vec<u8> = rmp_serde::to_vec(&item)?;
65        let bytes = Bytes::from(payload);
66        self.inner.encode(bytes, dst).map_err(Error::from)?;
67        Ok(())
68    }
69}