use crate::remote::RemoteWrapper;
use actix::prelude::*;
use byteorder::{ByteOrder, NetworkEndian};
use bytes::{Buf, BufMut, BytesMut};
use futures::io::Error;
use serde::{Deserialize, Serialize};
use std::io;
use tokio_util::codec::{Decoder, Encoder};
const PREFIX: &[u8] = b"ACTIX/1.0\r\n";
const ENDIAN_LENGTH: usize = 4;
#[derive(Message, Deserialize, Serialize)]
#[rtype(result = "()")]
pub enum ClusterMessage {
Request(u16, bool),
Response,
Message(RemoteWrapper),
Decline,
}
impl ClusterMessage {
pub fn split(&self) -> (Vec<u8>, Vec<u8>) {
(
match &self {
Self::Message(wrapper) => wrapper.message_buffer.clone(),
_ => panic!("split should not be used if not ClusterMessage::Message"),
},
flexbuffers::to_vec(self).unwrap(),
)
}
pub fn set_buffer(&mut self, bytes: Vec<u8>) {
match self {
Self::Message(ref mut wrapper) => wrapper.message_buffer = bytes,
_ => panic!("set_buffer should not be used if not ClusterMessage::Message"),
}
}
}
pub struct ConnectCodec {
prefix: bool,
}
impl ConnectCodec {
pub fn new() -> ConnectCodec {
ConnectCodec { prefix: false }
}
}
impl Decoder for ConnectCodec {
type Item = ClusterMessage;
type Error = Error;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
if !self.prefix {
if src.len() < 11 {
return Ok(None);
}
if &src[..11] == PREFIX {
let _s = src.split_to(11);
self.prefix = true;
} else {
return Err(io::Error::new(io::ErrorKind::Other, "Prefix mismatch"));
}
}
let size = {
if src.len() < ENDIAN_LENGTH {
return Ok(None);
}
NetworkEndian::read_u32(src.as_ref()) as usize
};
if src.len() >= size + (ENDIAN_LENGTH * 2) {
src.advance(ENDIAN_LENGTH);
let header_size = NetworkEndian::read_u32(src.as_ref()) as usize;
src.advance(ENDIAN_LENGTH);
if size > header_size {
let header = src.split_to(header_size);
let buf = src.split_to(size - header_size);
let mut cluster_message =
flexbuffers::from_slice::<ClusterMessage>(&header).unwrap();
cluster_message.set_buffer(buf.to_vec());
Ok(Some(cluster_message))
} else {
let buf = src.split_to(size);
Ok(Some(
flexbuffers::from_slice::<ClusterMessage>(&buf).unwrap(),
))
}
} else {
Ok(None)
}
}
}
impl Encoder<ClusterMessage> for ConnectCodec {
type Error = Error;
fn encode(&mut self, item: ClusterMessage, dst: &mut BytesMut) -> Result<(), Self::Error> {
match &item {
ClusterMessage::Request(_, _) => dst.extend_from_slice(PREFIX),
ClusterMessage::Response => dst.extend_from_slice(PREFIX),
ClusterMessage::Message(_) => {
let (buffer, header) = item.split();
let buffer_ref: &[u8] = buffer.as_ref();
let header_ref: &[u8] = header.as_ref();
dst.reserve(header_ref.len() + buffer_ref.len() + (ENDIAN_LENGTH * 2));
dst.put_u32((header_ref.len() + buffer_ref.len()) as u32);
dst.put_u32(header_ref.len() as u32);
dst.put(header_ref);
dst.put(buffer_ref);
return Ok(());
}
_ => {}
}
let msg = flexbuffers::to_vec(&item).unwrap();
let msg_ref: &[u8] = msg.as_ref();
dst.reserve(msg_ref.len() + (ENDIAN_LENGTH * 2));
dst.put_u32(msg_ref.len() as u32);
dst.put_u32(msg_ref.len() as u32);
dst.put(msg_ref);
Ok(())
}
}