1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
use core::marker::PhantomData;
use crate::serialize::do_serialize;
use crate::Clock;
use canadensis_core::time::Instant;
use canadensis_core::transfer::{Header, MessageHeader, Transfer};
use canadensis_core::transport::{TransferId, Transmitter, Transport};
use canadensis_core::{nb, SubjectId};
use canadensis_encoding::{Message, Serialize};
pub struct AnonymousPublisher<C: Clock, M, T: Transmitter<C::Instant>> {
priority: <T::Transport as Transport>::Priority,
subject: SubjectId,
next_transfer_id: <T::Transport as Transport>::TransferId,
timeout: <C::Instant as Instant>::Duration,
_message_phantom: PhantomData<M>,
}
impl<C, M, T> AnonymousPublisher<C, M, T>
where
C: Clock,
M: Message + Serialize,
T: Transmitter<C::Instant>,
{
pub fn new(
subject: SubjectId,
priority: <T::Transport as Transport>::Priority,
timeout: <C::Instant as Instant>::Duration,
) -> Self {
AnonymousPublisher {
priority,
subject,
next_transfer_id: <T::Transport as Transport>::TransferId::default(),
timeout,
_message_phantom: PhantomData,
}
}
pub fn send(
&mut self,
payload: &M,
clock: &mut C,
transmitter: &mut T,
driver: &mut T::Driver,
) -> nb::Result<(), AnonymousPublishError<T::Error>> {
let payload_size_bytes = (payload.size_bits() + 7) / 8;
if payload_size_bytes > transmitter.mtu() {
return Err(nb::Error::Other(AnonymousPublishError::Length));
}
let deadline = self.timeout + clock.now();
do_serialize(payload, |payload_bytes| {
self.send_payload(payload_bytes, deadline, transmitter, clock, driver)
})
.map_err(|e| e.map(AnonymousPublishError::Transport))?;
Ok(())
}
fn send_payload(
&mut self,
payload: &[u8],
deadline: C::Instant,
transmitter: &mut T,
clock: &mut C,
driver: &mut T::Driver,
) -> nb::Result<(), T::Error> {
let transfer = Transfer {
header: Header::Message(MessageHeader {
timestamp: deadline,
transfer_id: self.next_transfer_id.clone(),
priority: self.priority.clone(),
subject: self.subject,
source: None,
}),
payload,
};
self.next_transfer_id = self.next_transfer_id.clone().increment();
transmitter.push(transfer, clock, driver)
}
}
#[derive(Debug)]
pub enum AnonymousPublishError<E> {
Length,
Transport(E),
}
impl<E> From<E> for AnonymousPublishError<E> {
fn from(inner: E) -> Self {
AnonymousPublishError::Transport(inner)
}
}