1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
//! Unified frame representation.
use crate::MsgDescHot;
use bytes::Bytes;
/// Payload storage for a frame.
#[derive(Debug)]
pub enum Payload {
/// Payload bytes live inside `MsgDescHot::inline_payload`.
Inline,
/// Payload bytes are owned as a heap allocation.
Owned(Vec<u8>),
/// Payload bytes are stored in a ref-counted buffer (cheap clone).
Bytes(Bytes),
/// Payload bytes backed by a buffer pool (returns to pool on drop).
///
/// Placeholder for pooled buffers (see issue #46).
Pooled(PooledBuf),
/// Payload bytes backed by a shared-memory slot guard (frees slot on drop).
#[cfg(feature = "shm")]
Shm(crate::transport::shm::SlotGuard),
}
impl Payload {
/// Borrow the payload as a byte slice.
pub fn as_slice<'a>(&'a self, desc: &'a MsgDescHot) -> &'a [u8] {
match self {
Payload::Inline => desc.inline_payload(),
Payload::Owned(buf) => buf.as_slice(),
Payload::Bytes(buf) => buf.as_ref(),
Payload::Pooled(buf) => buf.as_ref(),
#[cfg(feature = "shm")]
Payload::Shm(guard) => guard.as_ref(),
}
}
/// Borrow the payload as a byte slice without needing a descriptor.
///
/// Returns `None` for [`Payload::Inline`], since inline bytes live inside
/// `MsgDescHot`.
pub fn external_slice(&self) -> Option<&[u8]> {
match self {
Payload::Inline => None,
Payload::Owned(buf) => Some(buf.as_slice()),
Payload::Bytes(buf) => Some(buf.as_ref()),
Payload::Pooled(buf) => Some(buf.as_ref()),
#[cfg(feature = "shm")]
Payload::Shm(guard) => Some(guard.as_ref()),
}
}
/// Returns the payload length in bytes.
pub fn len(&self, desc: &MsgDescHot) -> usize {
if let Some(ext) = self.external_slice() {
ext.len()
} else {
desc.payload_len as usize
}
}
/// Returns true if this payload is stored inline.
pub fn is_inline(&self) -> bool {
matches!(self, Payload::Inline)
}
}
/// Placeholder pooled buffer type.
#[derive(Debug)]
pub struct PooledBuf(Bytes);
impl AsRef<[u8]> for PooledBuf {
fn as_ref(&self) -> &[u8] {
self.0.as_ref()
}
}
/// Owned frame for sending, receiving, or routing.
#[derive(Debug)]
pub struct Frame {
/// The frame descriptor.
pub desc: MsgDescHot,
/// Payload storage for this frame.
pub payload: Payload,
}
impl Frame {
/// Create a new frame with no payload (inline empty).
pub fn new(desc: MsgDescHot) -> Self {
Self {
desc,
payload: Payload::Inline,
}
}
/// Create a frame with inline payload.
pub fn with_inline_payload(mut desc: MsgDescHot, payload: &[u8]) -> Option<Self> {
if payload.len() > crate::INLINE_PAYLOAD_SIZE {
return None;
}
desc.payload_slot = crate::INLINE_PAYLOAD_SLOT;
desc.payload_generation = 0;
desc.payload_offset = 0;
desc.payload_len = payload.len() as u32;
desc.inline_payload[..payload.len()].copy_from_slice(payload);
Some(Self {
desc,
payload: Payload::Inline,
})
}
/// Create a frame with an owned payload allocation.
pub fn with_payload(mut desc: MsgDescHot, payload: Vec<u8>) -> Self {
desc.payload_slot = 0;
desc.payload_generation = 0;
desc.payload_offset = 0;
desc.payload_len = payload.len() as u32;
Self {
desc,
payload: Payload::Owned(payload),
}
}
/// Create a frame with a ref-counted bytes buffer.
pub fn with_bytes(mut desc: MsgDescHot, payload: Bytes) -> Self {
desc.payload_slot = 0;
desc.payload_generation = 0;
desc.payload_offset = 0;
desc.payload_len = payload.len() as u32;
Self {
desc,
payload: Payload::Bytes(payload),
}
}
/// Borrow the payload as bytes.
pub fn payload_bytes(&self) -> &[u8] {
self.payload.as_slice(&self.desc)
}
}