1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
use std::{
    io::{Read, Write},
    marker::PhantomData,
};

use bytes::{Buf, BufMut, Bytes, BytesMut};
use serde::{de::DeserializeOwned, Serialize};

use crate::{
    error::Error,
    rpc::{RequestId, RpcFrame},
};

pub trait RpcSerializer: Send + Sync + 'static {
    fn serialize_to<T: Serialize, W: Write>(writer: W, value: T) -> Result<(), Error>;
    fn deserialize_from<T: DeserializeOwned, R: Read>(reader: R) -> Result<T, Error>;

    fn serialize_size_hint<T: Serialize>(_value: &T) -> usize {
        120
    }
}

pub enum BincodeSerializer {}

impl RpcSerializer for BincodeSerializer {
    fn serialize_to<T: Serialize, W: Write>(writer: W, value: T) -> Result<(), Error> {
        bincode::serialize_into(writer, &value).map_err(Into::into)
    }

    fn deserialize_from<T: DeserializeOwned, R: Read>(reader: R) -> Result<T, Error> {
        bincode::deserialize_from(reader).map_err(Into::into)
    }

    fn serialize_size_hint<T: Serialize>(value: &T) -> usize {
        bincode::serialized_size(value).unwrap_or(0) as usize
    }
}

pub enum JsonSerializer {}

impl RpcSerializer for JsonSerializer {
    fn serialize_to<T: Serialize, W: Write>(writer: W, value: T) -> Result<(), Error> {
        serde_json::to_writer(writer, &value)
            .map_err(Box::new)
            .map_err(Into::into)
    }

    fn deserialize_from<T: DeserializeOwned, R: Read>(reader: R) -> Result<T, Error> {
        serde_json::from_reader(reader)
            .map_err(Box::new)
            .map_err(Into::into)
    }
}

pub struct SerializedFrame<S: RpcSerializer>(Bytes, PhantomData<S>);

impl<S: RpcSerializer> SerializedFrame<S> {
    pub fn new(buf: impl Into<Bytes>) -> Self {
        Self(buf.into(), PhantomData)
    }

    pub fn inner(&self) -> Bytes {
        self.0.clone()
    }
}

impl<S: RpcSerializer> From<Bytes> for SerializedFrame<S> {
    fn from(value: Bytes) -> Self {
        Self::new(value)
    }
}

impl<S: RpcSerializer> From<BytesMut> for SerializedFrame<S> {
    fn from(value: BytesMut) -> Self {
        Self::new(value)
    }
}

impl<S: RpcSerializer> From<SerializedFrame<S>> for Bytes {
    fn from(value: SerializedFrame<S>) -> Self {
        value.0
    }
}

impl<T: Serialize + DeserializeOwned, S: RpcSerializer> RpcFrame<T> for SerializedFrame<S> {
    fn from_parts(id: RequestId, data: T) -> Result<Self, Error> {
        let cap = 8 + S::serialize_size_hint(&data);
        trace!("init with cap = {}", cap);
        let mut buf = BytesMut::with_capacity(cap);
        buf.put_u64(id.0);
        let mut writer = buf.writer();
        S::serialize_to(&mut writer, &data)?;
        let buf = writer.into_inner();
        trace!("fini with len = {}, cap = {}", buf.len(), buf.capacity());
        Ok(Self(buf.freeze(), PhantomData))
    }

    fn get_id(&self) -> RequestId {
        RequestId(self.0.clone().get_u64())
    }

    fn get_data(self) -> Result<T, Error> {
        S::deserialize_from(&self.0[8..])
    }
}