1pub mod batch;
18pub mod payload;
19pub mod tensor;
20
21use core::mem;
22
23use crate::memory::{BufferDescriptor, MemoryClass};
24use crate::message::payload::Payload;
25use crate::types::{DeadlineNs, QoSClass, SequenceNumber, Ticks, TraceId};
26
27#[repr(transparent)]
29#[derive(Debug, Clone, Copy, PartialEq, Eq)]
30pub struct MessageFlags(u32);
31
32impl MessageFlags {
33 pub const FIRST_IN_BATCH: u32 = 1 << 0;
35 pub const LAST_IN_BATCH: u32 = 1 << 1;
37 pub const DEGRADE_ALLOWED: u32 = 1 << 2;
39
40 #[inline]
42 pub const fn empty() -> Self {
43 Self(0)
44 }
45
46 #[inline]
48 pub const fn from_bits(bits: u32) -> Self {
49 Self(bits)
50 }
51
52 #[inline]
54 pub const fn bits(&self) -> &u32 {
55 &self.0
56 }
57
58 #[inline]
60 pub const fn with(self, bit: u32) -> Self {
61 Self(self.0 | bit)
62 }
63
64 #[inline]
66 pub const fn without(self, bit: u32) -> Self {
67 Self(self.0 & !bit)
68 }
69
70 #[inline]
72 pub const fn contains(self, bit: u32) -> bool {
73 (self.0 & bit) != 0
74 }
75
76 #[inline]
80 pub const fn first_in_batch(self) -> Self {
81 self.with(Self::FIRST_IN_BATCH)
82 }
83
84 #[inline]
86 pub const fn last_in_batch(self) -> Self {
87 self.with(Self::LAST_IN_BATCH)
88 }
89
90 #[inline]
92 pub const fn allow_degrade(self) -> Self {
93 self.with(Self::DEGRADE_ALLOWED)
94 }
95
96 #[inline]
98 pub const fn is_first(self) -> bool {
99 self.contains(Self::FIRST_IN_BATCH)
100 }
101
102 #[inline]
104 pub const fn is_last(self) -> bool {
105 self.contains(Self::LAST_IN_BATCH)
106 }
107
108 #[inline]
110 pub const fn can_degrade(self) -> bool {
111 self.contains(Self::DEGRADE_ALLOWED)
112 }
113}
114
115#[non_exhaustive]
117#[derive(Debug, Clone, Copy, PartialEq, Eq)]
118pub struct MessageHeader {
119 trace_id: TraceId,
121 sequence: SequenceNumber,
123 creation_tick: Ticks,
125 deadline_ns: Option<DeadlineNs>,
127 qos: QoSClass,
129 payload_size_bytes: usize,
131 flags: MessageFlags,
133 memory_class: MemoryClass,
135}
136
137impl MessageHeader {
138 #[allow(clippy::too_many_arguments)]
140 pub const fn new(
141 trace_id: TraceId,
142 sequence: SequenceNumber,
143 creation_tick: Ticks,
144 deadline_ns: Option<DeadlineNs>,
145 qos: QoSClass,
146 payload_size_bytes: usize,
147 flags: MessageFlags,
148 memory_class: MemoryClass,
149 ) -> Self {
150 Self {
151 trace_id,
152 sequence,
153 creation_tick,
154 deadline_ns,
155 qos,
156 payload_size_bytes,
157 flags,
158 memory_class,
159 }
160 }
161
162 #[inline]
164 pub const fn empty() -> Self {
165 Self {
166 trace_id: TraceId::new(0),
167 sequence: SequenceNumber::new(0),
168 creation_tick: Ticks::new(0),
169 deadline_ns: None,
170 qos: QoSClass::BestEffort,
171 payload_size_bytes: 0,
172 flags: MessageFlags::empty(),
173 memory_class: MemoryClass::Host,
174 }
175 }
176
177 #[inline]
179 pub fn is_empty(self) -> bool {
180 self == Self::empty()
181 }
182
183 #[inline]
185 pub fn sync_from_payload<P: Payload>(&mut self, payload: &P) {
186 let desc = payload.buffer_descriptor();
187 self.payload_size_bytes = *desc.bytes();
188 }
189
190 #[inline]
192 pub const fn trace_id(&self) -> &TraceId {
193 &self.trace_id
194 }
195
196 #[inline]
198 pub fn set_trace_id(&mut self, trace_id: TraceId) {
199 self.trace_id = trace_id;
200 }
201
202 #[inline]
204 pub const fn sequence(&self) -> &SequenceNumber {
205 &self.sequence
206 }
207
208 #[inline]
210 pub fn set_sequence(&mut self, sequence: SequenceNumber) {
211 self.sequence = sequence;
212 }
213
214 #[inline]
216 pub const fn creation_tick(&self) -> &Ticks {
217 &self.creation_tick
218 }
219
220 #[inline]
222 pub fn set_creation_tick(&mut self, creation_tick: Ticks) {
223 self.creation_tick = creation_tick;
224 }
225
226 #[inline]
228 pub const fn deadline_ns(&self) -> &Option<DeadlineNs> {
229 &self.deadline_ns
230 }
231
232 #[inline]
234 pub fn set_deadline_ns(&mut self, deadline_ns: Option<DeadlineNs>) {
235 self.deadline_ns = deadline_ns;
236 }
237
238 #[inline]
240 pub const fn qos(&self) -> &QoSClass {
241 &self.qos
242 }
243
244 #[inline]
246 pub fn set_qos(&mut self, qos: QoSClass) {
247 self.qos = qos;
248 }
249
250 #[inline]
252 pub const fn payload_size_bytes(&self) -> &usize {
253 &self.payload_size_bytes
254 }
255
256 #[inline]
258 pub fn set_payload_size_bytes(&mut self, payload_size_bytes: usize) {
259 self.payload_size_bytes = payload_size_bytes;
260 }
261
262 #[inline]
264 pub const fn flags(&self) -> &MessageFlags {
265 &self.flags
266 }
267
268 #[inline]
270 pub fn set_flags(&mut self, flags: MessageFlags) {
271 self.flags = flags;
272 }
273
274 #[inline]
276 pub const fn memory_class(&self) -> &MemoryClass {
277 &self.memory_class
278 }
279
280 #[inline]
282 pub fn set_memory_class(&mut self, memory_class: MemoryClass) {
283 self.memory_class = memory_class;
284 }
285
286 #[inline]
288 pub fn set_first_in_batch(&mut self) {
289 self.flags = self.flags.first_in_batch();
290 }
291
292 #[inline]
294 pub fn set_last_in_batch(&mut self) {
295 self.flags = self.flags.last_in_batch();
296 }
297}
298
299impl Default for MessageHeader {
300 #[inline]
301 fn default() -> Self {
302 Self::empty()
303 }
304}
305
306#[derive(Debug, Clone)]
308pub struct Message<P: Payload> {
309 header: MessageHeader,
311 payload: P,
313}
314
315impl<P> Copy for Message<P> where P: Payload + Copy {}
317
318impl<P: Payload> Message<P> {
319 pub fn new(mut header: MessageHeader, payload: P) -> Self {
321 let desc = payload.buffer_descriptor();
322 header.payload_size_bytes = *desc.bytes();
323 Self { header, payload }
324 }
325
326 #[inline]
328 pub fn with_payload<Q: Payload>(self, payload: Q) -> Message<Q> {
329 let mut header = self.header;
330 let desc = payload.buffer_descriptor();
331 header.payload_size_bytes = *desc.bytes();
332 Message { header, payload }
333 }
334
335 #[inline]
337 pub fn map_payload<Q: Payload>(self, f: impl FnOnce(P) -> Q) -> Message<Q> {
338 let Message {
339 mut header,
340 payload,
341 } = self;
342
343 let new_payload = f(payload);
344 let desc = new_payload.buffer_descriptor();
345 header.payload_size_bytes = *desc.bytes();
346
347 Message {
348 header,
349 payload: new_payload,
350 }
351 }
352
353 #[inline]
355 pub fn payload(&self) -> &P {
356 &self.payload
357 }
358
359 #[inline]
361 pub fn payload_mut(&mut self) -> &mut P {
362 &mut self.payload
363 }
364
365 #[inline]
367 pub fn header(&self) -> &MessageHeader {
368 &self.header
369 }
370
371 #[inline]
373 pub fn header_mut(&mut self) -> &mut MessageHeader {
374 &mut self.header
375 }
376
377 #[inline]
379 pub fn into_parts(self) -> (MessageHeader, P) {
380 (self.header, self.payload)
381 }
382}
383
384impl<P: Payload> Payload for Message<P> {
385 #[inline]
386 fn buffer_descriptor(&self) -> BufferDescriptor {
387 let payload_desc = self.payload.buffer_descriptor();
388 BufferDescriptor::new(*payload_desc.bytes() + mem::size_of::<MessageHeader>())
390 }
391}
392
393impl<'a, P: Payload + 'a> Payload for &'a Message<P> {
395 #[inline]
396 fn buffer_descriptor(&self) -> BufferDescriptor {
397 let payload_desc = self.payload.buffer_descriptor();
398 BufferDescriptor::new(*payload_desc.bytes() + mem::size_of::<MessageHeader>())
399 }
400}
401
402impl<P: Payload + Clone + Default> Default for Message<P> {
403 fn default() -> Self {
405 Message::new(MessageHeader::empty(), P::default())
406 }
407}