use crate::{
actors::ActorPath,
messaging::{HeapOrSer, NetData, NetMessage, Serialised},
net::{
buffers::{BufferEncoder, ChunkLease, ChunkRef},
frames::{FrameHead, FrameType, FRAME_HEAD_LEN},
},
serialisation::*,
};
use bytes::{buf::BufMut, Bytes, BytesMut};
pub fn serialise_to_msg(
src: ActorPath,
dst: ActorPath,
msg: Box<dyn Serialisable>,
) -> Result<NetMessage, SerError> {
if let Some(size) = msg.size_hint() {
let mut buf = BytesMut::with_capacity(size);
match msg.serialise(&mut buf) {
Ok(_) => {
let envelope = NetMessage::with_bytes(msg.ser_id(), src, dst, buf.freeze());
Ok(envelope)
}
Err(ser_err) => Err(ser_err),
}
} else {
Err(SerError::Unknown("Unknown serialisation size".into()))
}
}
pub fn serialise_to_serialised<S>(ser: &S) -> Result<Serialised, SerError>
where
S: Serialisable + ?Sized,
{
if let Some(size) = ser.size_hint() {
let mut buf = BytesMut::with_capacity(size);
ser.serialise(&mut buf).map(|_| Serialised {
ser_id: ser.ser_id(),
data: buf.freeze(),
})
} else {
Err(SerError::Unknown("Unknown serialisation size".into()))
}
}
pub fn serialiser_to_serialised<T, S>(t: &T, ser: &S) -> Result<Serialised, SerError>
where
T: std::fmt::Debug,
S: Serialiser<T>,
{
if let Some(size) = ser.size_hint() {
let mut buf = BytesMut::with_capacity(size);
ser.serialise(t, &mut buf).map(|_| Serialised {
ser_id: ser.ser_id(),
data: buf.freeze(),
})
} else {
Err(SerError::Unknown("Unknown serialisation size".into()))
}
}
pub fn serialise_msg<B>(
src: &ActorPath,
dst: &ActorPath,
msg: &B,
buf: &mut BufferEncoder,
) -> Result<ChunkLease, SerError>
where
B: Serialisable + ?Sized,
{
let mut reserve_size = 0;
if let Some(hint) = msg.size_hint() {
reserve_size += hint;
}
if let Some(hint) = src.size_hint() {
reserve_size += hint;
}
if let Some(hint) = dst.size_hint() {
reserve_size += hint;
}
buf.try_reserve(reserve_size + FRAME_HEAD_LEN as usize)?;
buf.pad(FRAME_HEAD_LEN as usize);
src.serialise(buf)?; dst.serialise(buf)?; buf.put_ser_id(msg.ser_id()); Serialisable::serialise(msg, buf)?; buf.get_chunk_lease().map(|mut chunk_lease| {
let len = chunk_lease.capacity() - FRAME_HEAD_LEN as usize; chunk_lease.insert_head(FrameHead::new(FrameType::Data, len));
assert_eq!(
chunk_lease.capacity(),
len + FRAME_HEAD_LEN as usize,
"Serialized frame length faulty {:#?}",
chunk_lease
);
chunk_lease
})
}
pub fn preserialise_msg<B>(msg: &B, buf: &mut BufferEncoder) -> Result<ChunkRef, SerError>
where
B: Serialisable + ?Sized,
{
if let Some(hint) = msg.size_hint() {
buf.try_reserve(hint)?;
}
buf.put_ser_id(msg.ser_id()); Serialisable::serialise(msg, buf)?; buf.get_chunk_lease()
.map(|chunk_lease| chunk_lease.into_chunk_ref())
}
pub fn serialise_msg_with_preserialised(
src: &ActorPath,
dst: &ActorPath,
content: ChunkRef,
buf: &mut BufferEncoder,
) -> Result<ChunkRef, SerError> {
buf.pad(FRAME_HEAD_LEN as usize);
src.serialise(buf)?; dst.serialise(buf)?; buf.get_chunk_lease().map(|mut header| {
let len = header.capacity() + content.capacity() - FRAME_HEAD_LEN as usize;
header.insert_head(FrameHead::new(FrameType::Data, len));
header.into_chunk_ref_with_tail(content)
})
}
pub fn embed_msg(msg: NetMessage, buf: &mut BufferEncoder) -> Result<ChunkRef, SerError> {
buf.pad(FRAME_HEAD_LEN as usize);
msg.sender.serialise(buf)?; msg.receiver.serialise(buf)?; let NetData { ser_id, data } = msg.data;
buf.put_ser_id(ser_id); match data {
HeapOrSer::Boxed(b) => {
b.serialise(buf)?;
}
HeapOrSer::Serialised(bytes) => {
buf.put(bytes);
}
HeapOrSer::ChunkLease(chunk_lease) => {
return buf.get_chunk_lease().map(|mut header_lease| {
let frame_len =
header_lease.capacity() - (FRAME_HEAD_LEN as usize) + chunk_lease.capacity();
header_lease.insert_head(FrameHead::new(FrameType::Data, frame_len));
let chunk_ref = header_lease.into_chunk_ref_with_tail(chunk_lease.into_chunk_ref());
assert_eq!(
chunk_ref.capacity(),
frame_len + FRAME_HEAD_LEN as usize,
"Serialized frame sizing failed"
);
chunk_ref
})
}
HeapOrSer::ChunkRef(chunk_ref) => {
return buf.get_chunk_lease().map(|mut header_lease| {
let frame_len =
header_lease.capacity() - (FRAME_HEAD_LEN as usize) + chunk_ref.capacity();
header_lease.insert_head(FrameHead::new(FrameType::Data, frame_len));
let chunk_ref = header_lease.into_chunk_ref_with_tail(chunk_ref);
assert_eq!(
chunk_ref.capacity(),
frame_len + FRAME_HEAD_LEN as usize,
"Serialized frame sizing failed"
);
chunk_ref
})
}
}
buf.get_chunk_lease().map(|mut chunk_lease| {
let len = chunk_lease.capacity() - FRAME_HEAD_LEN as usize; chunk_lease.insert_head(FrameHead::new(FrameType::Data, len));
assert_eq!(
chunk_lease.capacity(),
len + FRAME_HEAD_LEN as usize,
"Serialized frame sizing failed"
);
chunk_lease.into_chunk_ref()
})
}
pub fn deserialise_chunk_lease(mut buffer: ChunkLease) -> Result<NetMessage, SerError> {
let src = ActorPath::deserialise(&mut buffer)?;
let dst = ActorPath::deserialise(&mut buffer)?;
let ser_id = buffer.get_ser_id();
let envelope = NetMessage::with_chunk_ref(ser_id, src, dst, buffer.into_chunk_ref());
Ok(envelope)
}
pub fn deserialise_chunk_ref(mut buffer: ChunkRef) -> Result<NetMessage, SerError> {
let src = ActorPath::deserialise(&mut buffer)?;
let dst = ActorPath::deserialise(&mut buffer)?;
let ser_id = buffer.get_ser_id();
let envelope = NetMessage::with_chunk_ref(ser_id, src, dst, buffer);
Ok(envelope)
}
pub fn deserialise_bytes(mut buffer: Bytes) -> Result<NetMessage, SerError> {
let src = ActorPath::deserialise(&mut buffer)?;
let dst = ActorPath::deserialise(&mut buffer)?;
let ser_id = buffer.get_ser_id();
let envelope = NetMessage::with_bytes(ser_id, src, dst, buffer);
Ok(envelope)
}