nodo 0.18.5

A realtime framework for robotics
Documentation
// Copyright 2025 David Weikersdorfer

use crate::{
    channels::{FlushResult, SharedBackStage, TxConnectable, TxSendError},
    core::{AppMonotonicClock, Clock, PubtimeMarker},
    prelude::{Acqtime, DoubleBufferTx, Message, OverflowPolicy, Stamp, Tx},
};

/// Message transmitter using a double-buffer queue
///
/// This is similar to [`DoubleBufferTx<Message<T>>`] with the main difference that
/// [push](MessageTx::push) automatically creates a [Message].
pub struct MessageTx<T> {
    imp: DoubleBufferTx<Message<T>>,
    pub_clock: AppMonotonicClock<PubtimeMarker>,
    seq: u64,
}

impl<T> MessageTx<T> {
    /// Creates a new transmitter which can publish at most the given number of messages per step.
    pub fn new(capacity: usize) -> Self {
        Self::new_imp(DoubleBufferTx::new(capacity))
    }

    /// Creates a new transmitter which can publish an unbounded number of messages per step. Not
    /// recommended as it can lead to queues growing without bound.
    pub fn new_auto_size() -> Self {
        Self::new_imp(DoubleBufferTx::new_auto_size())
    }

    fn new_imp(imp: DoubleBufferTx<Message<T>>) -> Self {
        Self {
            imp,
            pub_clock: AppMonotonicClock::new(),
            seq: 0,
        }
    }

    /// Adds a [Message] to the transmitter with given acqtime and value. The message "pubtime" is
    /// set to the current time and the message sequence number is automatically incremented.
    /// Messages are published after the step functions concludes.
    pub fn push(&mut self, acqtime: Acqtime, value: T) -> Result<(), TxSendError> {
        let seq = self.pub_seq();
        let pubtime = self.pub_clock.now();
        self.imp.push(Message {
            seq,
            stamp: Stamp { acqtime, pubtime },
            value,
        })
    }

    /// Adds multiple messages to the transmitter (see [push](MessageTx::push)).
    pub fn push_many<I: IntoIterator<Item = (Acqtime, T)>>(
        &mut self,
        values: I,
    ) -> Result<(), TxSendError> {
        for (acqtime, value) in values.into_iter() {
            self.push(acqtime, value)?;
        }
        Ok(())
    }

    fn pub_seq(&mut self) -> u64 {
        let seq = self.seq;
        self.seq = self.seq.wrapping_add(1);
        seq
    }
}

impl<T: Send + Sync + Clone> Tx for MessageTx<T> {
    fn flush(&mut self) -> FlushResult {
        self.imp.flush()
    }

    fn is_connected(&self) -> bool {
        self.imp.is_connected()
    }

    fn len(&self) -> usize {
        self.imp.len()
    }
}

impl<V: Send + Sync + Clone> TxConnectable for MessageTx<V> {
    type Message = Message<V>;

    fn has_max_connection_count(&self) -> bool {
        self.imp.has_max_connection_count()
    }

    fn overflow_policy(&self) -> OverflowPolicy {
        self.imp.overflow_policy()
    }

    fn on_connect(&mut self, stage: SharedBackStage<Self::Message>) {
        self.imp.on_connect(stage);
    }
}