flatbuffers_tonic/
codec.rs

1use std::marker::PhantomData;
2
3use bytes::{Buf, BufMut};
4use tonic::{
5    Status,
6    codec::{BufferSettings, Codec, Decoder, EncodeBuf, Encoder},
7};
8
9use crate::OwnedFBCodecable;
10
11/// TODO: codec still has copy step due to tonic DecodeBuf and EncodeBuf implementation.
12#[derive(Debug, Clone)]
13pub struct FlatBuffersCodec<T, U> {
14    _pd: PhantomData<(T, U)>,
15}
16
17impl<T, U> FlatBuffersCodec<T, U> {
18    /// Configure a FlatBuffersCodec with encoder/decoder buffer settings. This is used to control
19    /// how memory is allocated and grows per RPC.
20    pub fn new() -> Self {
21        Self { _pd: PhantomData }
22    }
23}
24
25impl<T, U> Default for FlatBuffersCodec<T, U> {
26    fn default() -> Self {
27        Self::new()
28    }
29}
30
31impl<T, U> Codec for FlatBuffersCodec<T, U>
32where
33    T: OwnedFBCodecable + Send + 'static,
34    U: OwnedFBCodecable + Send + 'static,
35{
36    type Encode = T;
37    type Decode = U;
38
39    type Encoder = FlatBuffersEncoder<T>;
40    type Decoder = FlatBuffersDecoder<U>;
41
42    fn encoder(&mut self) -> Self::Encoder {
43        FlatBuffersEncoder {
44            _pd: PhantomData,
45            buffer_settings: BufferSettings::default(),
46        }
47    }
48
49    fn decoder(&mut self) -> Self::Decoder {
50        FlatBuffersDecoder {
51            _pd: PhantomData,
52            buffer_settings: BufferSettings::default(),
53        }
54    }
55}
56
57/// A [`Encoder`] that knows how to encode `T`.
58#[derive(Debug, Clone, Default)]
59pub struct FlatBuffersEncoder<T> {
60    _pd: PhantomData<T>,
61    buffer_settings: BufferSettings,
62}
63
64impl<T> Encoder for FlatBuffersEncoder<T>
65where
66    T: OwnedFBCodecable + Send + 'static,
67{
68    type Item = T;
69    type Error = Status;
70
71    fn encode(&mut self, item: Self::Item, buf: &mut EncodeBuf<'_>) -> Result<(), Self::Error> {
72        // First step is zero copy due to Bytes::from(vec)
73        let bytes = item.into_bytes();
74        buf.reserve(bytes.len());
75        // This still require copy due to BytesMut impl. Zero copy might not be possible.
76        buf.put(bytes);
77        Ok(())
78    }
79
80    fn buffer_settings(&self) -> BufferSettings {
81        self.buffer_settings
82    }
83}
84
85pub struct FlatBuffersDecoder<U> {
86    _pd: PhantomData<U>,
87    buffer_settings: BufferSettings,
88}
89
90impl<U: OwnedFBCodecable + Send + 'static> Decoder for FlatBuffersDecoder<U> {
91    type Item = U;
92
93    type Error = Status;
94
95    fn decode(
96        &mut self,
97        src: &mut tonic::codec::DecodeBuf<'_>,
98    ) -> Result<Option<Self::Item>, Self::Error> {
99        // First should be zero copy due to BytesMut impl.
100        let buf = src.copy_to_bytes(src.remaining());
101        // This is not zero copy because DecodeBuf is a shared BytesMut.
102        // Even if it is not shared, BytesMut may have multiple chunks.
103        // Flatbuffer need contiguous memory, so this will make a copy in most cases.
104        let owned_fb = U::new_from_bytes(buf)
105            .map_err(|e| Status::internal(format!("Failed to decode FlatBuffer: {}", e)))?;
106        Ok(Some(owned_fb))
107    }
108
109    fn buffer_settings(&self) -> BufferSettings {
110        self.buffer_settings
111    }
112}