Skip to main content

limen_core/
message.rs

1//! Message header, flags, and typed message wrapper.
2//!
3//! Every value in the graph is a [`Message<P>`] carrying a fixed
4//! [`MessageHeader`] and a generic payload `P: Payload`.
5//!
6//! - [`MessageHeader`] — trace ID, sequence number, creation tick, optional
7//!   deadline, QoS class, payload size, flags, and memory class.
8//! - [`MessageFlags`] — compact bitfield for batch boundary and degrade hints.
9//! - [`Message<P>`] — the header/payload pair; implements [`Payload`] itself
10//!   so batches of messages can be nested.
11//!
12//! Submodules:
13//! - [`payload`] — the [`Payload`] trait and blanket impls for slices/arrays/scalars.
14//! - [`tensor`] — owned, fixed-capacity, `no_std`/`no_alloc` [`Tensor`](tensor::Tensor) type.
15//! - [`batch`] — [`Batch`](batch::Batch) view, [`BatchView`](batch::BatchView) container, and [`BatchMessageIter`](batch::BatchMessageIter).
16
17pub mod batch;
18pub mod payload;
19pub mod tensor;
20
21use core::mem;
22
23use crate::memory::{BufferDescriptor, MemoryClass};
24use crate::message::payload::Payload;
25use crate::types::{DeadlineNs, QoSClass, SequenceNumber, Ticks, TraceId};
26
27/// A compact bitfield of message flags.
28#[repr(transparent)]
29#[derive(Debug, Clone, Copy, PartialEq, Eq)]
30pub struct MessageFlags(u32);
31
32impl MessageFlags {
33    /// Flag: this message is the first element in a batch.
34    pub const FIRST_IN_BATCH: u32 = 1 << 0;
35    /// Flag: this message is the last element in a batch.
36    pub const LAST_IN_BATCH: u32 = 1 << 1;
37    /// Flag: downstream may degrade this message (e.g., fast/low-precision path).
38    pub const DEGRADE_ALLOWED: u32 = 1 << 2;
39
40    /// Create an empty flag set.
41    #[inline]
42    pub const fn empty() -> Self {
43        Self(0)
44    }
45
46    /// Construct from raw bits (advanced).
47    #[inline]
48    pub const fn from_bits(bits: u32) -> Self {
49        Self(bits)
50    }
51
52    /// Return the raw flag bits.
53    #[inline]
54    pub const fn bits(&self) -> &u32 {
55        &self.0
56    }
57
58    /// Set a flag bit.
59    #[inline]
60    pub const fn with(self, bit: u32) -> Self {
61        Self(self.0 | bit)
62    }
63
64    /// Clear a flag bit.
65    #[inline]
66    pub const fn without(self, bit: u32) -> Self {
67        Self(self.0 & !bit)
68    }
69
70    /// Check whether a flag bit is set.
71    #[inline]
72    pub const fn contains(self, bit: u32) -> bool {
73        (self.0 & bit) != 0
74    }
75
76    // Typed helpers (readable call sites, avoid repeating bit constants).
77
78    /// Return a copy with `FIRST_IN_BATCH` set.
79    #[inline]
80    pub const fn first_in_batch(self) -> Self {
81        self.with(Self::FIRST_IN_BATCH)
82    }
83
84    /// Return a copy with `LAST_IN_BATCH` set.
85    #[inline]
86    pub const fn last_in_batch(self) -> Self {
87        self.with(Self::LAST_IN_BATCH)
88    }
89
90    /// Return a copy with `DEGRADE_ALLOWED` set.
91    #[inline]
92    pub const fn allow_degrade(self) -> Self {
93        self.with(Self::DEGRADE_ALLOWED)
94    }
95
96    /// `true` if `FIRST_IN_BATCH` is set.
97    #[inline]
98    pub const fn is_first(self) -> bool {
99        self.contains(Self::FIRST_IN_BATCH)
100    }
101
102    /// `true` if `LAST_IN_BATCH` is set.
103    #[inline]
104    pub const fn is_last(self) -> bool {
105        self.contains(Self::LAST_IN_BATCH)
106    }
107
108    /// `true` if `DEGRADE_ALLOWED` is set.
109    #[inline]
110    pub const fn can_degrade(self) -> bool {
111        self.contains(Self::DEGRADE_ALLOWED)
112    }
113}
114
115/// Fixed header present on all messages that traverse the runtime.
116#[non_exhaustive]
117#[derive(Debug, Clone, Copy, PartialEq, Eq)]
118pub struct MessageHeader {
119    /// Correlation identifier for tracing across nodes.
120    trace_id: TraceId,
121    /// Monotonic sequence number assigned by producers/routers.
122    sequence: SequenceNumber,
123    /// Creation tick (monotonic; platform-defined units).
124    creation_tick: Ticks,
125    /// Optional absolute deadline in nanoseconds since boot (P2).
126    deadline_ns: Option<DeadlineNs>,
127    /// QoS class used by admission/scheduling.
128    qos: QoSClass,
129    /// Reported payload size (bytes), used for byte-cap admission.
130    payload_size_bytes: usize,
131    /// Message flags (batch boundaries, degrade hints).
132    flags: MessageFlags,
133    /// Memory class where the payload currently resides.
134    memory_class: MemoryClass,
135}
136
137impl MessageHeader {
138    /// Construct a new header.
139    #[allow(clippy::too_many_arguments)]
140    pub const fn new(
141        trace_id: TraceId,
142        sequence: SequenceNumber,
143        creation_tick: Ticks,
144        deadline_ns: Option<DeadlineNs>,
145        qos: QoSClass,
146        payload_size_bytes: usize,
147        flags: MessageFlags,
148        memory_class: MemoryClass,
149    ) -> Self {
150        Self {
151            trace_id,
152            sequence,
153            creation_tick,
154            deadline_ns,
155            qos,
156            payload_size_bytes,
157            flags,
158            memory_class,
159        }
160    }
161
162    /// A zero/identity header (safe for scratch use).
163    #[inline]
164    pub const fn empty() -> Self {
165        Self {
166            trace_id: TraceId::new(0),
167            sequence: SequenceNumber::new(0),
168            creation_tick: Ticks::new(0),
169            deadline_ns: None,
170            qos: QoSClass::BestEffort,
171            payload_size_bytes: 0,
172            flags: MessageFlags::empty(),
173            memory_class: MemoryClass::Host,
174        }
175    }
176
177    /// Returns true is the message header is empty.
178    #[inline]
179    pub fn is_empty(self) -> bool {
180        self == Self::empty()
181    }
182
183    /// Update `payload_size_bytes` and `memory_class` from a payload descriptor.
184    #[inline]
185    pub fn sync_from_payload<P: Payload>(&mut self, payload: &P) {
186        let desc = payload.buffer_descriptor();
187        self.payload_size_bytes = *desc.bytes();
188    }
189
190    /// Return the trace id.
191    #[inline]
192    pub const fn trace_id(&self) -> &TraceId {
193        &self.trace_id
194    }
195
196    /// Set the trace id.
197    #[inline]
198    pub fn set_trace_id(&mut self, trace_id: TraceId) {
199        self.trace_id = trace_id;
200    }
201
202    /// Return the sequence number.
203    #[inline]
204    pub const fn sequence(&self) -> &SequenceNumber {
205        &self.sequence
206    }
207
208    /// Set the sequence number.
209    #[inline]
210    pub fn set_sequence(&mut self, sequence: SequenceNumber) {
211        self.sequence = sequence;
212    }
213
214    /// Return the creation tick.
215    #[inline]
216    pub const fn creation_tick(&self) -> &Ticks {
217        &self.creation_tick
218    }
219
220    /// Set the creation tick.
221    #[inline]
222    pub fn set_creation_tick(&mut self, creation_tick: Ticks) {
223        self.creation_tick = creation_tick;
224    }
225
226    /// Return the optional deadline.
227    #[inline]
228    pub const fn deadline_ns(&self) -> &Option<DeadlineNs> {
229        &self.deadline_ns
230    }
231
232    /// Set the optional deadline in nanoseconds since boot.
233    #[inline]
234    pub fn set_deadline_ns(&mut self, deadline_ns: Option<DeadlineNs>) {
235        self.deadline_ns = deadline_ns;
236    }
237
238    /// Return the QoS class.
239    #[inline]
240    pub const fn qos(&self) -> &QoSClass {
241        &self.qos
242    }
243
244    /// Set the QoS class.
245    #[inline]
246    pub fn set_qos(&mut self, qos: QoSClass) {
247        self.qos = qos;
248    }
249
250    /// Return the payload size in bytes.
251    #[inline]
252    pub const fn payload_size_bytes(&self) -> &usize {
253        &self.payload_size_bytes
254    }
255
256    /// Set the payload size in bytes.
257    #[inline]
258    pub fn set_payload_size_bytes(&mut self, payload_size_bytes: usize) {
259        self.payload_size_bytes = payload_size_bytes;
260    }
261
262    /// Return the message flags.
263    #[inline]
264    pub const fn flags(&self) -> &MessageFlags {
265        &self.flags
266    }
267
268    /// Set the message flags.
269    #[inline]
270    pub fn set_flags(&mut self, flags: MessageFlags) {
271        self.flags = flags;
272    }
273
274    /// Return the memory class.
275    #[inline]
276    pub const fn memory_class(&self) -> &MemoryClass {
277        &self.memory_class
278    }
279
280    /// Set the memory class.
281    #[inline]
282    pub fn set_memory_class(&mut self, memory_class: MemoryClass) {
283        self.memory_class = memory_class;
284    }
285
286    /// Mark this header as the first element in a batch by setting `FIRST_IN_BATCH`.
287    #[inline]
288    pub fn set_first_in_batch(&mut self) {
289        self.flags = self.flags.first_in_batch();
290    }
291
292    /// Mark this header as the last element in a batch by setting `LAST_IN_BATCH`.
293    #[inline]
294    pub fn set_last_in_batch(&mut self) {
295        self.flags = self.flags.last_in_batch();
296    }
297}
298
299impl Default for MessageHeader {
300    #[inline]
301    fn default() -> Self {
302        Self::empty()
303    }
304}
305
306/// A message with a generic payload `P`.
307#[derive(Debug, Clone)]
308pub struct Message<P: Payload> {
309    /// The header fields.
310    header: MessageHeader,
311    /// The payload object or view.
312    payload: P,
313}
314
315// Copy only when the payload is Copy (e.g., TensorRef<'a>).
316impl<P> Copy for Message<P> where P: Payload + Copy {}
317
318impl<P: Payload> Message<P> {
319    /// Construct a new message from a header and payload, fixing size and class.
320    pub fn new(mut header: MessageHeader, payload: P) -> Self {
321        let desc = payload.buffer_descriptor();
322        header.payload_size_bytes = *desc.bytes();
323        Self { header, payload }
324    }
325
326    /// Swap payloads while recalculating header fields.
327    #[inline]
328    pub fn with_payload<Q: Payload>(self, payload: Q) -> Message<Q> {
329        let mut header = self.header;
330        let desc = payload.buffer_descriptor();
331        header.payload_size_bytes = *desc.bytes();
332        Message { header, payload }
333    }
334
335    /// Transform payloads while preserving header metadata correctly.
336    #[inline]
337    pub fn map_payload<Q: Payload>(self, f: impl FnOnce(P) -> Q) -> Message<Q> {
338        let Message {
339            mut header,
340            payload,
341        } = self;
342
343        let new_payload = f(payload);
344        let desc = new_payload.buffer_descriptor();
345        header.payload_size_bytes = *desc.bytes();
346
347        Message {
348            header,
349            payload: new_payload,
350        }
351    }
352
353    /// Borrow the payload.
354    #[inline]
355    pub fn payload(&self) -> &P {
356        &self.payload
357    }
358
359    /// Mutable borrow of the payload.
360    #[inline]
361    pub fn payload_mut(&mut self) -> &mut P {
362        &mut self.payload
363    }
364
365    /// Borrow the header.
366    #[inline]
367    pub fn header(&self) -> &MessageHeader {
368        &self.header
369    }
370
371    /// Mutable borrow of the header.
372    #[inline]
373    pub fn header_mut(&mut self) -> &mut MessageHeader {
374        &mut self.header
375    }
376
377    /// Decompose into `(header, payload)`.
378    #[inline]
379    pub fn into_parts(self) -> (MessageHeader, P) {
380        (self.header, self.payload)
381    }
382}
383
384impl<P: Payload> Payload for Message<P> {
385    #[inline]
386    fn buffer_descriptor(&self) -> BufferDescriptor {
387        let payload_desc = self.payload.buffer_descriptor();
388        // Add header size to the payload byte size, keep the payload memory class.
389        BufferDescriptor::new(*payload_desc.bytes() + mem::size_of::<MessageHeader>())
390    }
391}
392
393// Also useful: implement for borrowed Message references to match other impls above.
394impl<'a, P: Payload + 'a> Payload for &'a Message<P> {
395    #[inline]
396    fn buffer_descriptor(&self) -> BufferDescriptor {
397        let payload_desc = self.payload.buffer_descriptor();
398        BufferDescriptor::new(*payload_desc.bytes() + mem::size_of::<MessageHeader>())
399    }
400}
401
402impl<P: Payload + Clone + Default> Default for Message<P> {
403    /// Default `Message<P>` constructed from an empty header and `P::default()`.
404    fn default() -> Self {
405        Message::new(MessageHeader::empty(), P::default())
406    }
407}