use crate::serialize::do_serialize;
use canadensis_core::time::{Clock, Instant};
use canadensis_core::transfer::{Header, MessageHeader, Transfer};
use canadensis_core::transport::{TransferId, Transmitter, Transport};
use canadensis_core::{nb, SubjectId};
use canadensis_encoding::{Message, Serialize};
pub struct Publisher<I: Instant, T: Transmitter<I>> {
next_transfer_id: <T::Transport as Transport>::TransferId,
timeout: I::Duration,
priority: <T::Transport as Transport>::Priority,
source: <T::Transport as Transport>::NodeId,
}
impl<I: Instant, T: Transmitter<I>> Publisher<I, T> {
pub fn new(
node_id: <T::Transport as Transport>::NodeId,
timeout: I::Duration,
priority: <T::Transport as Transport>::Priority,
) -> Self {
Publisher {
next_transfer_id: <T::Transport as Transport>::TransferId::default(),
timeout,
priority,
source: node_id,
}
}
pub fn publish<M, C>(
&mut self,
clock: &mut C,
subject: SubjectId,
payload: &M,
transmitter: &mut T,
driver: &mut T::Driver,
) -> nb::Result<(), T::Error>
where
M: Message + Serialize,
I: Instant,
C: Clock<Instant = I>,
{
let deadline = self.timeout + clock.now();
do_serialize(payload, |payload_bytes| {
self.send_payload(subject, payload_bytes, deadline, transmitter, clock, driver)
})
}
pub fn send_payload<C>(
&mut self,
subject: SubjectId,
payload: &[u8],
deadline: I,
transmitter: &mut T,
clock: &mut C,
driver: &mut T::Driver,
) -> nb::Result<(), T::Error>
where
I: Clone,
C: Clock<Instant = I>,
{
let transfer = Transfer {
header: Header::Message(MessageHeader {
timestamp: deadline,
transfer_id: self.next_transfer_id.clone(),
priority: self.priority.clone(),
subject,
source: Some(self.source.clone()),
}),
payload,
};
self.next_transfer_id = self.next_transfer_id.clone().increment();
transmitter.push(transfer, clock, driver)
}
}
mod fmt_impl {
use crate::publisher::Publisher;
use canadensis_core::time::Instant;
use canadensis_core::transport::{Transmitter, Transport};
use core::fmt::{Debug, Formatter, Result};
impl<I, T> Debug for Publisher<I, T>
where
I: Instant,
I::Duration: Debug,
T: Transmitter<I>,
<T::Transport as Transport>::TransferId: Debug,
<T::Transport as Transport>::Priority: Debug,
<T::Transport as Transport>::NodeId: Debug,
{
fn fmt(&self, f: &mut Formatter<'_>) -> Result {
f.debug_struct("Publisher")
.field("next_transfer_id", &self.next_transfer_id)
.field("timeout", &self.timeout)
.field("priority", &self.priority)
.field("source", &self.source)
.finish()
}
}
}