1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
use crate::{
frame::{FlowKind, Kind},
Frame,
};
use async_hal::{delay::DelayMs, io::AsyncWrite};
use core::{
marker::PhantomData,
pin::Pin,
task::{Context, Poll},
};
use futures::{ready, Sink, SinkExt, Stream};
use pin_project_lite::pin_project;
enum State {
Empty,
Single {
frame: Option<Frame>,
},
Consecutive {
pos: Option<u8>,
remaining: u8,
is_delaying: bool,
st: u8,
},
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum Error<T, R, D> {
Transmit(T),
Receive(R),
Delay(D),
InvalidFrame,
Aborted,
UnexpectedEOF,
}
pin_project! {
/// Writer for an ISO-TP message.
pub struct Writer<T, E, D> {
#[pin]
transport: T,
#[pin]
delay: D,
state: State,
_marker: PhantomData<E>
}
}
impl<T, E, D> Writer<T, E, D> {
pub fn new(transport: T, delay: D) -> Self {
Self {
transport,
delay,
state: State::Empty,
_marker: PhantomData,
}
}
}
impl<T, E, D> AsyncWrite for Writer<T, E, D>
where
T: Sink<Frame> + Stream<Item = Result<Frame, E>>,
D: DelayMs + Unpin,
D::Delay: From<u8>,
{
type Error = Error<T::Error, E, D::Error>;
fn poll_write(
self: Pin<&mut Self>,
cx: &mut Context,
buf: &[u8],
) -> Poll<Result<usize, Self::Error>> {
let mut me = self.project();
loop {
match me.state {
State::Empty => {
// Start a new transfer
*me.state = if let Some(frame) = Frame::single(buf) {
State::Single { frame: Some(frame) }
} else {
State::Consecutive {
pos: None,
remaining: 0,
is_delaying: false,
st: 0,
}
};
}
State::Single { frame } => {
if frame.is_none() {
// This transfer is already finished
break Poll::Ready(Ok(0));
}
// Send a single frame
ready!(me.transport.as_mut().poll_ready(cx)).map_err(Error::Transmit)?;
// Take the current frame so it can only be written once
let frame = frame.take().unwrap();
me.transport.start_send(frame).map_err(Error::Transmit)?;
// Reset the current state
*me.state = State::Empty;
// Return the total len of `buf`
break Poll::Ready(Ok(buf.len()));
}
State::Consecutive {
pos,
remaining,
is_delaying,
st,
} => {
// Poll the current delay if it's in progress
if *is_delaying {
ready!(me.delay.as_mut().poll_delay_ms(cx)).map_err(Error::Delay)?;
*is_delaying = false;
}
if let Some(pos) = pos {
// Check if we have any remaining frames left
if *remaining == 0 {
// Wait for the next frame from `rx`
let frame = ready!(me.transport.as_mut().poll_next(cx))
.ok_or(Error::UnexpectedEOF)?
.map_err(Error::Receive)?;
// Make sure the frame is control flow
if frame.kind() != Some(Kind::Flow) {
return Poll::Ready(Err(Error::InvalidFrame));
}
// Handle control flow kinds
match frame.flow_kind() {
FlowKind::Continue => {}
FlowKind::Wait => {
// Delay for the received wait time
me.delay
.as_mut()
.start(frame.flow_st().into())
.map_err(Error::Delay)?;
*is_delaying = true;
continue;
}
FlowKind::Abort => {
// Abort this transfer
*me.state = State::Empty;
return Poll::Ready(Err(Error::Aborted));
}
}
*remaining = frame.flow_len();
*st = frame.flow_st();
}
// Send a consecutive frame for the current transfer in progress
let (frame, used) = Frame::consecutive(*pos, buf);
ready!(poll_send(cx, me.transport.as_mut(), frame))?;
// Prepare the state for the next frame
*pos += 1;
*remaining -= 1;
// Delay for the received seperation time
me.delay
.as_mut()
.start(st.clone().into())
.map_err(Error::Delay)?;
*is_delaying = true;
break Poll::Ready(Ok(used));
} else {
// Send the first frame of this sequence
let (frame, used) = Frame::first(buf);
ready!(poll_send(cx, me.transport.as_mut(), frame))?;
*pos = Some(0);
break Poll::Ready(Ok(used));
}
}
}
}
}
fn poll_flush(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
self.project()
.transport
.poll_flush(cx)
.map_err(Error::Transmit)
}
}
fn poll_send<S, R, D>(
cx: &mut Context,
mut tx: S,
frame: Frame,
) -> Poll<Result<(), Error<S::Error, R, D>>>
where
S: Sink<Frame> + Unpin,
{
ready!(tx.poll_ready_unpin(cx)).map_err(Error::Transmit)?;
tx.start_send_unpin(frame).map_err(Error::Transmit)?;
Poll::Ready(Ok(()))
}