use timely_communication::{Serialize, Push};
use std::ops::{Deref, DerefMut};
use abomonation::{Abomonation, encode, decode};
#[derive(Clone)]
pub struct Message<T, D> {
pub time: T,
pub data: Content<D>,
pub from: usize,
pub seq: usize,
}
impl<T, D> Message<T, D> {
#[inline]
pub fn new(time: T, data: Content<D>, from: usize, seq: usize) -> Message<T, D> {
Message {
time: time,
data: data,
from: from,
seq: seq,
}
}
}
impl<T: Abomonation+Clone, D: Abomonation> Serialize for Message<T, D> {
#[inline]
fn into_bytes(&mut self, bytes: &mut Vec<u8>) {
unsafe { encode(&self.time, bytes); }
unsafe { encode(&self.from, bytes); }
unsafe { encode(&self.seq, bytes); }
let vec: &Vec<D> = self.data.deref();
unsafe { encode(vec, bytes); }
}
#[inline]
fn from_bytes(bytes: &mut Vec<u8>) -> Self {
let mut bytes = ::std::mem::replace(bytes, Vec::new());
let x_len = bytes.len();
let (time, from, seq, offset) = {
let (t,r) = unsafe { decode::<T>(&mut bytes) }.unwrap();
let (&f,r) = unsafe { decode::<usize>(r) }.unwrap();
let (&s,r) = unsafe { decode::<usize>(r) }.unwrap();
let o = x_len - r.len();
((*t).clone(), f, s, o)
};
let length = unsafe { decode::<Vec<D>>(&mut bytes[offset..]) }.unwrap().0.len();
Message::new(time, Content::Bytes(bytes, offset, length), from, seq)
}
}
#[derive(Clone)]
pub enum Content<D> {
Bytes(Vec<u8>, usize, usize),
Typed(Vec<D>),
}
impl<D> Content<D> {
pub fn take(&mut self) -> Content<D> {
::std::mem::replace(self, Content::Typed(Vec::new()))
}
#[inline]
pub fn default_length() -> usize { 1024 }
#[inline]
pub fn len(&self) -> usize {
match *self {
Content::Bytes(_, _, length) => length,
Content::Typed(ref data) => data.len(),
}
}
#[inline]
pub fn from_typed(typed: &mut Vec<D>) -> Content<D> {
Content::Typed(::std::mem::replace(typed, Vec::new()))
}
#[inline]
pub fn into_typed(self) -> Vec<D> {
match self {
Content::Bytes(_,_,_) => Vec::new(),
Content::Typed(mut data) => { data.clear(); data },
}
}
#[inline]
pub fn push_at<T, P: Push<(T, Content<D>)>>(buffer: &mut Vec<D>, time: T, pusher: &mut P) {
let data = Content::from_typed(buffer);
let mut message = Some((time, data));
pusher.push(&mut message);
if let Some((_, Content::Typed(mut typed))) = message {
typed.clear();
*buffer = typed;
}
else {
*buffer = Vec::with_capacity(Content::<D>::default_length());
}
if buffer.capacity() != Content::<D>::default_length() {
*buffer = Vec::with_capacity(Content::<D>::default_length());
}
}
}
impl<D: Clone+Abomonation> Content<D> {
#[inline]
pub fn replace_with(&mut self, other: Vec<D>) -> Vec<D> {
::std::mem::replace(self.deref_mut(), other)
}
}
impl<D: Abomonation> Deref for Content<D> {
type Target = Vec<D>;
#[inline]
fn deref(&self) -> &Vec<D> {
match *self {
Content::Bytes(ref bytes, offset, _length) => {
unsafe { ::std::mem::transmute(bytes.get_unchecked(offset)) }
},
Content::Typed(ref data) => data,
}
}
}
impl<D: Clone+Abomonation> DerefMut for Content<D> {
#[inline]
fn deref_mut(&mut self) -> &mut Vec<D> {
let value = if let Content::Bytes(ref mut bytes, offset, _length) = *self {
let data: &Vec<D> = unsafe { ::std::mem::transmute(bytes.get_unchecked(offset)) };
Some(Content::Typed((*data).clone()))
}
else { None };
if let Some(contents) = value {
*self = contents;
}
if let Content::Typed(ref mut data) = *self {
data
}
else { unreachable!() }
}
}