1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
use super::*;
use crate::serialisation::ser_helpers::deserialise_bytes;
/// An abstraction over lazy or eagerly serialised data sent to the dispatcher
#[derive(Debug)]
pub enum DispatchData {
/// Lazily serialised variant – must still be serialised by the dispatcher or networking system
/// (Serialisable Msg, Source, Destination)
Lazy(Box<dyn Serialisable>, ActorPath, ActorPath),
/// Should be serialised and [framed](crate::net::frames::Frame).
Serialised(SerialisedFrame),
/// Used in message forwarding
NetMessage(NetMessage),
}
impl DispatchData {
/// Try to extract a network message from this data for local delivery
///
/// This can fail, if the data can't be moved onto the heap, and serialisation
/// also fails.
pub fn into_local(self) -> Result<NetMessage, SerError> {
match self {
DispatchData::Lazy(ser, src, dst) => {
let ser_id = ser.ser_id();
Ok(NetMessage::with_box(ser_id, src, dst, ser))
}
DispatchData::Serialised(SerialisedFrame::ChunkLease(mut chunk_lease)) => {
// The chunk contains the full frame, deserialize_msg does not deserialize FrameHead so we advance the read_pointer first
chunk_lease.advance(FRAME_HEAD_LEN as usize);
//println!("to_local (from: {:?}; to: {:?})", src, dst);
Ok(deserialise_chunk_lease(chunk_lease).expect("s11n errors"))
}
DispatchData::Serialised(SerialisedFrame::ChunkRef(mut chunk_ref)) => {
// The chunk contains the full frame, deserialize_msg does not deserialize FrameHead so we advance the read_pointer first
chunk_ref.advance(FRAME_HEAD_LEN as usize);
//println!("to_local (from: {:?}; to: {:?})", src, dst);
Ok(deserialise_chunk_ref(chunk_ref).expect("s11n errors"))
}
DispatchData::Serialised(SerialisedFrame::Bytes(mut bytes)) => {
bytes.advance(FRAME_HEAD_LEN as usize);
//println!("to_local (from: {:?}; to: {:?})", src, dst);
Ok(deserialise_bytes(bytes).expect("s11n errors"))
}
DispatchData::NetMessage(net_message) => Ok(net_message),
}
}
/// Try to serialise this to data to bytes for remote delivery
pub fn into_serialised(self, buf: &mut BufferEncoder) -> Result<SerialisedFrame, SerError> {
match self {
DispatchData::Lazy(ser, src, dst) => Ok(SerialisedFrame::ChunkLease(
crate::serialisation::ser_helpers::serialise_msg(&src, &dst, ser.deref(), buf)?,
)),
DispatchData::Serialised(frame) => Ok(frame),
DispatchData::NetMessage(net_message) => Ok(SerialisedFrame::ChunkRef(
crate::serialisation::ser_helpers::embed_msg(net_message, buf)?,
)),
}
}
}
/// Envelope with messages for the system'sdispatcher
#[derive(Debug)]
pub enum DispatchEnvelope {
/// A potential network message that must be resolved
Msg {
/// The source of the message
src: ActorPath,
/// The destination of the message
dst: ActorPath,
/// The actual data to be dispatched
msg: DispatchData,
},
/// A message that may already be partially serialised
ForwardedMsg {
/// The message being forwarded
msg: NetMessage,
},
/// A request for actor path registration
Registration(RegistrationEnvelope),
/// An event from the network
Event(EventEnvelope),
/// Killed components send their BufferChunks to the Dispatcher for safe de-allocation
LockedChunk(BufferChunk),
}