use super::*;
use crate::prelude::SessionId;
#[derive(Debug)]
pub struct NetMessage {
pub sender: ActorPath,
pub receiver: ActorPath,
pub data: NetData,
pub session: Option<SessionId>,
}
#[derive(Debug)]
pub struct NetData {
pub ser_id: SerId,
pub(crate) data: HeapOrSer,
}
#[derive(Debug)]
pub enum HeapOrSer {
Boxed(Box<dyn Serialisable>),
Serialised(bytes::Bytes),
ChunkLease(ChunkLease),
ChunkRef(ChunkRef),
}
impl NetMessage {
pub fn with_box(
ser_id: SerId,
sender: ActorPath,
receiver: ActorPath,
data: Box<dyn Serialisable>,
) -> NetMessage {
NetMessage {
sender,
receiver,
data: NetData::with(ser_id, HeapOrSer::Boxed(data)),
session: None,
}
}
pub fn with_bytes(
ser_id: SerId,
sender: ActorPath,
receiver: ActorPath,
data: Bytes,
) -> NetMessage {
NetMessage {
sender,
receiver,
data: NetData::with(ser_id, HeapOrSer::Serialised(data)),
session: None,
}
}
pub fn with_chunk_lease(
ser_id: SerId,
sender: ActorPath,
receiver: ActorPath,
data: ChunkLease,
) -> NetMessage {
NetMessage {
sender,
receiver,
data: NetData::with(ser_id, HeapOrSer::ChunkLease(data)),
session: None,
}
}
pub fn with_chunk_ref(
ser_id: SerId,
sender: ActorPath,
receiver: ActorPath,
data: ChunkRef,
) -> NetMessage {
NetMessage {
sender,
receiver,
data: NetData::with(ser_id, HeapOrSer::ChunkRef(data)),
session: None,
}
}
pub fn sender(&self) -> &ActorPath {
&self.sender
}
pub fn session(&self) -> Option<SessionId> {
self.session
}
pub fn set_session(&mut self, session: SessionId) -> () {
self.session = Some(session);
}
pub fn try_into_deserialised<T: 'static, D>(
self,
) -> Result<DeserialisedMessage<T>, UnpackError<Self>>
where
D: Deserialiser<T>,
{
let NetMessage {
sender,
receiver,
data,
session,
} = self;
match data.try_deserialise::<T, D>() {
Ok(t) => Ok(DeserialisedMessage::with(sender, receiver, t)),
Err(e) => Err(match e {
UnpackError::NoIdMatch(data) => UnpackError::NoIdMatch(NetMessage {
sender,
receiver,
data,
session,
}),
UnpackError::NoCast(data) => UnpackError::NoCast(data),
UnpackError::DeserError(e) => UnpackError::DeserError(e),
}),
}
}
pub fn try_deserialise<T: 'static, D>(self) -> Result<T, UnpackError<Self>>
where
D: Deserialiser<T>,
{
if self.data.ser_id == D::SER_ID {
self.try_deserialise_unchecked::<T, D>()
} else {
Err(UnpackError::NoIdMatch(self))
}
}
pub fn try_deserialise_unchecked<T: 'static, D>(self) -> Result<T, UnpackError<Self>>
where
D: Deserialiser<T>,
{
let NetMessage {
sender,
receiver,
data,
session,
} = self;
data.try_deserialise_unchecked::<T, D>()
.map_err(|e| match e {
UnpackError::NoIdMatch(data) => UnpackError::NoIdMatch(NetMessage {
sender,
receiver,
data,
session,
}),
UnpackError::NoCast(data) => UnpackError::NoCast(data),
UnpackError::DeserError(e) => UnpackError::DeserError(e),
})
}
pub fn ser_id(&self) -> &SerId {
&self.data.ser_id
}
}
impl TryClone for NetMessage {
fn try_clone(&self) -> Result<Self, SerError> {
self.data.try_clone().map(|data| NetMessage {
sender: self.sender.clone(),
receiver: self.receiver.clone(),
data,
session: self.session,
})
}
}
impl NetData {
pub fn with(ser_id: SerId, data: HeapOrSer) -> Self {
NetData { ser_id, data }
}
pub fn try_deserialise<T: 'static, D>(self) -> Result<T, UnpackError<Self>>
where
D: Deserialiser<T>,
{
if self.ser_id == D::SER_ID {
self.try_deserialise_unchecked::<T, D>()
} else {
Err(UnpackError::NoIdMatch(self))
}
}
pub fn try_deserialise_unchecked<T: 'static, D>(self) -> Result<T, UnpackError<Self>>
where
D: Deserialiser<T>,
{
let NetData { ser_id, data } = self;
match data {
HeapOrSer::Boxed(boxed_ser) => {
let b = boxed_ser.local().map_err(|_| {
UnpackError::DeserError(SerError::Unknown(format!(
"Serialisable with id={} can't be converted to local!",
ser_id
)))
})?;
b.downcast::<T>().map(|b| *b).map_err(UnpackError::NoCast)
}
HeapOrSer::Serialised(mut bytes) => {
D::deserialise(&mut bytes).map_err(UnpackError::DeserError)
}
HeapOrSer::ChunkLease(mut chunk) => {
D::deserialise(&mut chunk).map_err(UnpackError::DeserError)
}
HeapOrSer::ChunkRef(mut chunk) => {
D::deserialise(&mut chunk).map_err(UnpackError::DeserError)
}
}
}
pub fn ser_id(&self) -> &SerId {
&self.ser_id
}
}
impl TryClone for NetData {
fn try_clone(&self) -> Result<Self, SerError> {
self.data.try_clone().map(|data| NetData {
ser_id: self.ser_id,
data,
})
}
}
impl TryClone for HeapOrSer {
fn try_clone(&self) -> Result<Self, SerError> {
match self {
HeapOrSer::Boxed(ser) => {
let mut buf: Vec<u8> = Vec::new();
let res = ser.serialise(&mut buf);
res.map(|_| HeapOrSer::Serialised(Bytes::from(buf)))
}
HeapOrSer::Serialised(bytes) => Ok(HeapOrSer::Serialised(bytes.clone())),
HeapOrSer::ChunkLease(chunk) => Ok(HeapOrSer::Serialised(chunk.create_byte_clone())),
HeapOrSer::ChunkRef(chunk) => Ok(HeapOrSer::ChunkRef(chunk.clone())),
}
}
}
#[derive(Debug)]
pub struct DeserialisedMessage<T> {
pub sender: ActorPath,
pub receiver: ActorPath,
pub content: T,
}
impl<T> DeserialisedMessage<T> {
pub fn with(sender: ActorPath, receiver: ActorPath, content: T) -> Self {
DeserialisedMessage {
sender,
receiver,
content,
}
}
}
#[derive(Debug)]
pub enum UnpackError<T> {
NoIdMatch(T),
NoCast(Box<dyn Any + Send + 'static>),
DeserError(SerError),
}
impl<T> UnpackError<T> {
pub fn get(self) -> Option<T> {
match self {
UnpackError::NoIdMatch(t) => Some(t),
UnpackError::NoCast(_) => None,
UnpackError::DeserError(_) => None,
}
}
pub fn get_box(self) -> Option<Box<dyn Any + Send + 'static>> {
match self {
UnpackError::NoIdMatch(_) => None,
UnpackError::NoCast(b) => Some(b),
UnpackError::DeserError(_) => None,
}
}
}