use bytes::BytesMut;
use serde::{Deserialize, Serialize};
use tokio_codec::{BytesCodec, Decoder, Encoder, FramedRead, FramedWrite};
use tokio_io::AsyncRead;
use tokio_io::io::{ReadHalf, WriteHalf};
use tokio_serde_bincode::{ReadBincode, WriteBincode};
use tokio_uds::UnixStream;
use super::*;
#[derive(Copy, Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
pub struct FailureBytesCodec(BytesCodec);
impl FailureBytesCodec {
pub fn new() -> FailureBytesCodec {
FailureBytesCodec(BytesCodec::new())
}
}
impl Decoder for FailureBytesCodec {
type Item = <BytesCodec as Decoder>::Item;
type Error = failure::Error;
fn decode(
&mut self,
src: &mut BytesMut
) -> Result<Option<Self::Item>, Self::Error> {
Ok(self.0.decode(src)?)
}
}
impl Encoder for FailureBytesCodec {
type Item = <BytesCodec as Encoder>::Item;
type Error = failure::Error;
fn encode(
&mut self,
item: Self::Item,
dst: &mut BytesMut
) -> Result<(), Self::Error> {
Ok(self.0.encode(item, dst)?)
}
}
pub type Serializer<M> = WriteBincode<FramedWrite<WriteHalf<UnixStream>, FailureBytesCodec>, M>;
pub type Deserializer<M> = ReadBincode<FramedRead<ReadHalf<UnixStream>, FailureBytesCodec>, M>;
pub fn split<S, D>(conn: UnixStream) -> (Serializer<S>, Deserializer<D>)
where for <'a> D: Deserialize<'a>,
S: Serialize
{
let (read, write) = conn.split();
let wdelim = FramedWrite::new(write, FailureBytesCodec::new());
let ser = WriteBincode::new(wdelim);
let rdelim = FramedRead::new(read, FailureBytesCodec::new());
let de = ReadBincode::new(rdelim);
(ser, de)
}