use crate::serialize::do_serialize;
use canadensis_core::time::{Clock, MicrosecondDuration32, Microseconds32};
use canadensis_core::transfer::{Header, MessageHeader, Transfer};
use canadensis_core::transport::{TransferId, Transmitter, Transport};
use canadensis_core::{nb, SubjectId};
use canadensis_encoding::{Message, Serialize};
pub struct Publisher<C: Clock, T: Transmitter<C>> {
next_transfer_id: <T::Transport as Transport>::TransferId,
timeout: MicrosecondDuration32,
priority: <T::Transport as Transport>::Priority,
}
impl<C: Clock, T: Transmitter<C>> Publisher<C, T> {
pub fn new(
timeout: MicrosecondDuration32,
priority: <T::Transport as Transport>::Priority,
) -> Self {
Publisher {
next_transfer_id: <T::Transport as Transport>::TransferId::default(),
timeout,
priority,
}
}
pub fn publish<M>(
&mut self,
clock: &mut C,
source: Option<<T::Transport as Transport>::NodeId>,
subject: SubjectId,
payload: &M,
transmitter: &mut T,
driver: &mut T::Driver,
) -> nb::Result<(), T::Error>
where
M: Message + Serialize,
{
let deadline = clock.now() + self.timeout;
do_serialize(payload, |payload_bytes| {
self.send_payload(
source,
subject,
payload_bytes,
deadline,
false,
transmitter,
clock,
driver,
)
})
}
pub fn publish_loopback<M>(
&mut self,
clock: &mut C,
source: Option<<T::Transport as Transport>::NodeId>,
subject: SubjectId,
payload: &M,
transmitter: &mut T,
driver: &mut T::Driver,
) -> nb::Result<(), T::Error>
where
M: Message + Serialize,
{
let deadline = clock.now() + self.timeout;
do_serialize(payload, |payload_bytes| {
self.send_payload(
source,
subject,
payload_bytes,
deadline,
true,
transmitter,
clock,
driver,
)
})
}
fn send_payload(
&mut self,
source: Option<<T::Transport as Transport>::NodeId>,
subject: SubjectId,
payload: &[u8],
deadline: Microseconds32,
loopback: bool,
transmitter: &mut T,
clock: &mut C,
driver: &mut T::Driver,
) -> nb::Result<(), T::Error> {
let transfer = Transfer {
header: Header::Message(MessageHeader {
timestamp: deadline,
transfer_id: self.next_transfer_id.clone(),
priority: self.priority.clone(),
subject,
source,
}),
loopback,
payload,
};
self.next_transfer_id = self.next_transfer_id.clone().increment();
transmitter.push(transfer, clock, driver)
}
}
mod fmt_impl {
use crate::publisher::Publisher;
use canadensis_core::time::Clock;
use canadensis_core::transport::{Transmitter, Transport};
use core::fmt::{Debug, Formatter, Result};
impl<C, T> Debug for Publisher<C, T>
where
C: Clock,
T: Transmitter<C>,
<T::Transport as Transport>::TransferId: Debug,
<T::Transport as Transport>::Priority: Debug,
<T::Transport as Transport>::NodeId: Debug,
{
fn fmt(&self, f: &mut Formatter<'_>) -> Result {
f.debug_struct("Publisher")
.field("next_transfer_id", &self.next_transfer_id)
.field("timeout", &self.timeout)
.field("priority", &self.priority)
.finish()
}
}
#[cfg(feature = "defmt")]
impl<C, T> defmt::Format for Publisher<C, T>
where
C: Clock,
T: Transmitter<C>,
<T::Transport as Transport>::TransferId: defmt::Format,
<T::Transport as Transport>::Priority: defmt::Format,
<T::Transport as Transport>::NodeId: defmt::Format,
{
fn format(&self, f: defmt::Formatter) {
defmt::write!(
f,
"Publisher {{ next_transfer_id: {}, timeout: {}, priority: {} }}",
self.next_transfer_id,
self.timeout,
self.priority
)
}
}
}