canadensis/
anonymous.rs

1//!
2//! A publisher that can be used by anonymous nodes
3//!
4
5use core::marker::PhantomData;
6
7use crate::serialize::do_serialize;
8use crate::Clock;
9use canadensis_core::time::{MicrosecondDuration32, Microseconds32};
10use canadensis_core::transfer::{Header, MessageHeader, Transfer};
11use canadensis_core::transport::{TransferId, Transmitter, Transport};
12use canadensis_core::{nb, SubjectId};
13use canadensis_encoding::{Message, Serialize};
14
15/// A transmitter that sends anonymous messages and does not require a node ID
16///
17/// Anonymous nodes have some limitations:
18/// * They can only send messages, not service requests or responses
19/// * They cannot send multi-frame messages
20pub struct AnonymousPublisher<C: Clock, M, T: Transmitter<C>> {
21    /// The priority of transfers from this transmitter
22    priority: <T::Transport as Transport>::Priority,
23    /// The subject to transmit on
24    subject: SubjectId,
25    /// The ID of the next transfer sent
26    next_transfer_id: <T::Transport as Transport>::TransferId,
27    /// Frame transmit timeout
28    timeout: MicrosecondDuration32,
29    /// Message type phantom data
30    _message_phantom: PhantomData<M>,
31}
32
33impl<C, M, T> AnonymousPublisher<C, M, T>
34where
35    C: Clock,
36    M: Message + Serialize,
37    T: Transmitter<C>,
38{
39    /// Creates an anonymous message publisher
40    pub fn new(
41        subject: SubjectId,
42        priority: <T::Transport as Transport>::Priority,
43        timeout: MicrosecondDuration32,
44    ) -> Self {
45        AnonymousPublisher {
46            priority,
47            subject,
48            next_transfer_id: <T::Transport as Transport>::TransferId::default(),
49            timeout,
50            _message_phantom: PhantomData,
51        }
52    }
53
54    /// Prepares an anonymous message for sending and pushes it into the provided transmitter
55    ///
56    /// This function returns an error if the message is too long to fit into one frame, or if
57    /// memory allocation fails.
58    pub fn send(
59        &mut self,
60        payload: &M,
61        clock: &mut C,
62        transmitter: &mut T,
63        driver: &mut T::Driver,
64    ) -> nb::Result<(), AnonymousPublishError<T::Error>> {
65        self.send_inner(payload, false, clock, transmitter, driver)
66    }
67
68    /// Prepares an anonymous message, with the loopback flag set, for sending and pushes it into
69    /// the provided transmitter
70    ///
71    /// This function returns an error if the message is too long to fit into one frame, or if
72    /// memory allocation fails.
73    pub fn send_loopback(
74        &mut self,
75        payload: &M,
76        clock: &mut C,
77        transmitter: &mut T,
78        driver: &mut T::Driver,
79    ) -> nb::Result<(), AnonymousPublishError<T::Error>> {
80        self.send_inner(payload, true, clock, transmitter, driver)
81    }
82
83    fn send_inner(
84        &mut self,
85        payload: &M,
86        loopback: bool,
87        clock: &mut C,
88        transmitter: &mut T,
89        driver: &mut T::Driver,
90    ) -> nb::Result<(), AnonymousPublishError<T::Error>> {
91        // Check that the message fits into one frame
92        let payload_size_bytes = payload.size_bits().div_ceil(8);
93        if payload_size_bytes > transmitter.mtu() {
94            return Err(nb::Error::Other(AnonymousPublishError::Length));
95        }
96        // Part 1: Serialize
97        let deadline = clock.now() + self.timeout;
98        do_serialize(payload, |payload_bytes| {
99            self.send_payload(
100                payload_bytes,
101                deadline,
102                loopback,
103                transmitter,
104                clock,
105                driver,
106            )
107        })
108        .map_err(|e| e.map(AnonymousPublishError::Transport))?;
109        Ok(())
110    }
111
112    fn send_payload(
113        &mut self,
114        payload: &[u8],
115        deadline: Microseconds32,
116        loopback: bool,
117        transmitter: &mut T,
118        clock: &mut C,
119        driver: &mut T::Driver,
120    ) -> nb::Result<(), T::Error> {
121        // Assemble the transfer
122        let transfer = Transfer {
123            header: Header::Message(MessageHeader {
124                timestamp: deadline,
125                transfer_id: self.next_transfer_id.clone(),
126                priority: self.priority.clone(),
127                subject: self.subject,
128                source: None,
129            }),
130            loopback,
131            payload,
132        };
133        self.next_transfer_id = self.next_transfer_id.clone().increment();
134
135        transmitter.push(transfer, clock, driver)
136    }
137}
138
139/// Errors that can occur when publishing an anonymous message
140#[derive(Debug)]
141pub enum AnonymousPublishError<E> {
142    /// The message was too long to fit into one frame
143    Length,
144    /// The transport returned an error
145    Transport(E),
146}
147
148impl<E> From<E> for AnonymousPublishError<E> {
149    fn from(inner: E) -> Self {
150        AnonymousPublishError::Transport(inner)
151    }
152}