pub mod batch;
pub mod payload;
pub mod tensor;
use core::mem;
use crate::memory::{BufferDescriptor, MemoryClass};
use crate::message::payload::Payload;
use crate::types::{DeadlineNs, QoSClass, SequenceNumber, Ticks, TraceId};
#[repr(transparent)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct MessageFlags(u32);
impl MessageFlags {
pub const FIRST_IN_BATCH: u32 = 1 << 0;
pub const LAST_IN_BATCH: u32 = 1 << 1;
pub const DEGRADE_ALLOWED: u32 = 1 << 2;
#[inline]
pub const fn empty() -> Self {
Self(0)
}
#[inline]
pub const fn from_bits(bits: u32) -> Self {
Self(bits)
}
#[inline]
pub const fn bits(&self) -> &u32 {
&self.0
}
#[inline]
pub const fn with(self, bit: u32) -> Self {
Self(self.0 | bit)
}
#[inline]
pub const fn without(self, bit: u32) -> Self {
Self(self.0 & !bit)
}
#[inline]
pub const fn contains(self, bit: u32) -> bool {
(self.0 & bit) != 0
}
#[inline]
pub const fn first_in_batch(self) -> Self {
self.with(Self::FIRST_IN_BATCH)
}
#[inline]
pub const fn last_in_batch(self) -> Self {
self.with(Self::LAST_IN_BATCH)
}
#[inline]
pub const fn allow_degrade(self) -> Self {
self.with(Self::DEGRADE_ALLOWED)
}
#[inline]
pub const fn is_first(self) -> bool {
self.contains(Self::FIRST_IN_BATCH)
}
#[inline]
pub const fn is_last(self) -> bool {
self.contains(Self::LAST_IN_BATCH)
}
#[inline]
pub const fn can_degrade(self) -> bool {
self.contains(Self::DEGRADE_ALLOWED)
}
}
#[non_exhaustive]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct MessageHeader {
trace_id: TraceId,
sequence: SequenceNumber,
creation_tick: Ticks,
deadline_ns: Option<DeadlineNs>,
qos: QoSClass,
payload_size_bytes: usize,
flags: MessageFlags,
memory_class: MemoryClass,
}
impl MessageHeader {
#[allow(clippy::too_many_arguments)]
pub const fn new(
trace_id: TraceId,
sequence: SequenceNumber,
creation_tick: Ticks,
deadline_ns: Option<DeadlineNs>,
qos: QoSClass,
payload_size_bytes: usize,
flags: MessageFlags,
memory_class: MemoryClass,
) -> Self {
Self {
trace_id,
sequence,
creation_tick,
deadline_ns,
qos,
payload_size_bytes,
flags,
memory_class,
}
}
#[inline]
pub const fn empty() -> Self {
Self {
trace_id: TraceId::new(0),
sequence: SequenceNumber::new(0),
creation_tick: Ticks::new(0),
deadline_ns: None,
qos: QoSClass::BestEffort,
payload_size_bytes: 0,
flags: MessageFlags::empty(),
memory_class: MemoryClass::Host,
}
}
#[inline]
pub fn is_empty(self) -> bool {
self == Self::empty()
}
#[inline]
pub fn sync_from_payload<P: Payload>(&mut self, payload: &P) {
let desc = payload.buffer_descriptor();
self.payload_size_bytes = *desc.bytes();
}
#[inline]
pub const fn trace_id(&self) -> &TraceId {
&self.trace_id
}
#[inline]
pub fn set_trace_id(&mut self, trace_id: TraceId) {
self.trace_id = trace_id;
}
#[inline]
pub const fn sequence(&self) -> &SequenceNumber {
&self.sequence
}
#[inline]
pub fn set_sequence(&mut self, sequence: SequenceNumber) {
self.sequence = sequence;
}
#[inline]
pub const fn creation_tick(&self) -> &Ticks {
&self.creation_tick
}
#[inline]
pub fn set_creation_tick(&mut self, creation_tick: Ticks) {
self.creation_tick = creation_tick;
}
#[inline]
pub const fn deadline_ns(&self) -> &Option<DeadlineNs> {
&self.deadline_ns
}
#[inline]
pub fn set_deadline_ns(&mut self, deadline_ns: Option<DeadlineNs>) {
self.deadline_ns = deadline_ns;
}
#[inline]
pub const fn qos(&self) -> &QoSClass {
&self.qos
}
#[inline]
pub fn set_qos(&mut self, qos: QoSClass) {
self.qos = qos;
}
#[inline]
pub const fn payload_size_bytes(&self) -> &usize {
&self.payload_size_bytes
}
#[inline]
pub fn set_payload_size_bytes(&mut self, payload_size_bytes: usize) {
self.payload_size_bytes = payload_size_bytes;
}
#[inline]
pub const fn flags(&self) -> &MessageFlags {
&self.flags
}
#[inline]
pub fn set_flags(&mut self, flags: MessageFlags) {
self.flags = flags;
}
#[inline]
pub const fn memory_class(&self) -> &MemoryClass {
&self.memory_class
}
#[inline]
pub fn set_memory_class(&mut self, memory_class: MemoryClass) {
self.memory_class = memory_class;
}
#[inline]
pub fn set_first_in_batch(&mut self) {
self.flags = self.flags.first_in_batch();
}
#[inline]
pub fn set_last_in_batch(&mut self) {
self.flags = self.flags.last_in_batch();
}
}
impl Default for MessageHeader {
#[inline]
fn default() -> Self {
Self::empty()
}
}
#[derive(Debug, Clone)]
pub struct Message<P: Payload> {
header: MessageHeader,
payload: P,
}
impl<P> Copy for Message<P> where P: Payload + Copy {}
impl<P: Payload> Message<P> {
pub fn new(mut header: MessageHeader, payload: P) -> Self {
let desc = payload.buffer_descriptor();
header.payload_size_bytes = *desc.bytes();
Self { header, payload }
}
#[inline]
pub fn with_payload<Q: Payload>(self, payload: Q) -> Message<Q> {
let mut header = self.header;
let desc = payload.buffer_descriptor();
header.payload_size_bytes = *desc.bytes();
Message { header, payload }
}
#[inline]
pub fn map_payload<Q: Payload>(self, f: impl FnOnce(P) -> Q) -> Message<Q> {
let Message {
mut header,
payload,
} = self;
let new_payload = f(payload);
let desc = new_payload.buffer_descriptor();
header.payload_size_bytes = *desc.bytes();
Message {
header,
payload: new_payload,
}
}
#[inline]
pub fn payload(&self) -> &P {
&self.payload
}
#[inline]
pub fn payload_mut(&mut self) -> &mut P {
&mut self.payload
}
#[inline]
pub fn header(&self) -> &MessageHeader {
&self.header
}
#[inline]
pub fn header_mut(&mut self) -> &mut MessageHeader {
&mut self.header
}
#[inline]
pub fn into_parts(self) -> (MessageHeader, P) {
(self.header, self.payload)
}
}
impl<P: Payload> Payload for Message<P> {
#[inline]
fn buffer_descriptor(&self) -> BufferDescriptor {
let payload_desc = self.payload.buffer_descriptor();
BufferDescriptor::new(*payload_desc.bytes() + mem::size_of::<MessageHeader>())
}
}
impl<'a, P: Payload + 'a> Payload for &'a Message<P> {
#[inline]
fn buffer_descriptor(&self) -> BufferDescriptor {
let payload_desc = self.payload.buffer_descriptor();
BufferDescriptor::new(*payload_desc.bytes() + mem::size_of::<MessageHeader>())
}
}
impl<P: Payload + Clone + Default> Default for Message<P> {
fn default() -> Self {
Message::new(MessageHeader::empty(), P::default())
}
}