1use std::time::Duration;
19
20use dvb_ci::tpdu::{create_t_c, tags, CommandTpdu, DataBlock, ResponseTpdu, SbValue, TcObject};
21use dvb_common::{Parse, Serialize};
22
23const SB_OBJECT_LEN: usize = 4;
25
26fn parse_sb(bytes: &[u8]) -> Option<(u8, SbValue)> {
28 if bytes.len() >= SB_OBJECT_LEN && bytes[0] == tags::SB && bytes[1] == 0x02 {
29 Some((bytes[2], SbValue(bytes[3])))
30 } else {
31 None
32 }
33}
34
35pub const DEFAULT_POLL_INTERVAL: Duration = Duration::from_millis(100);
38pub const DEFAULT_REPLY_TIMEOUT: Duration = Duration::from_millis(1000);
41
42#[derive(Debug, Clone, Copy, PartialEq, Eq)]
44pub enum TcState {
45 Idle,
47 Creating,
49 Active,
51}
52
53#[derive(Debug, Default, Clone, PartialEq, Eq)]
55pub struct Out {
56 pub writes: Vec<Vec<u8>>,
58 pub spdus: Vec<Vec<u8>>,
60 pub timer: Option<Duration>,
62 pub error: Option<TransportError>,
64}
65
66#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
68#[non_exhaustive]
69pub enum TransportError {
70 #[error("transport connection setup timed out")]
72 SetupTimeout,
73 #[error("unexpected t_c_id {got} (expected {expected})")]
75 WrongTcId {
76 got: u8,
78 expected: u8,
80 },
81 #[error("module reported T_C_Error")]
83 ModuleError,
84 #[error("malformed R_TPDU")]
86 Malformed,
87}
88
89#[derive(Debug)]
91pub struct Transport {
92 tcid: u8,
93 state: TcState,
94 reassembly: Vec<u8>,
95 poll_interval: Duration,
96 reply_timeout: Duration,
97 since_poll: Duration,
99 awaiting: Option<Duration>,
102}
103
104impl Default for Transport {
105 fn default() -> Self {
106 Self::new(1)
107 }
108}
109
110impl Transport {
111 #[must_use]
113 pub fn new(tcid: u8) -> Self {
114 Self {
115 tcid,
116 state: TcState::Idle,
117 reassembly: Vec::new(),
118 poll_interval: DEFAULT_POLL_INTERVAL,
119 reply_timeout: DEFAULT_REPLY_TIMEOUT,
120 since_poll: Duration::ZERO,
121 awaiting: None,
122 }
123 }
124
125 #[must_use]
127 pub fn with_timing(mut self, poll: Duration, reply: Duration) -> Self {
128 self.poll_interval = poll;
129 self.reply_timeout = reply;
130 self
131 }
132
133 #[must_use]
135 pub fn state(&self) -> TcState {
136 self.state
137 }
138
139 fn cmd(&self, tag: u8, data: &[u8]) -> Vec<u8> {
140 let c = CommandTpdu {
141 tag,
142 t_c_id: self.tcid,
143 data,
144 };
145 let mut buf = vec![0u8; c.serialized_len()];
146 let n = c.serialize_into(&mut buf).expect("exact buffer");
148 buf.truncate(n);
149 buf
150 }
151
152 fn poll_frame(&self) -> Vec<u8> {
153 self.cmd(tags::DATA_LAST, &[])
155 }
156
157 pub fn init(&mut self) -> Out {
159 self.state = TcState::Creating;
160 self.awaiting = Some(Duration::ZERO);
161 let obj: TcObject = create_t_c(self.tcid);
162 Out {
163 writes: vec![obj.to_bytes()],
164 timer: Some(self.reply_timeout),
165 ..Out::default()
166 }
167 }
168
169 pub fn send_spdu(&mut self, spdu: &[u8]) -> Out {
171 if self.state != TcState::Active {
172 return Out::default();
173 }
174 self.awaiting = Some(Duration::ZERO);
175 self.since_poll = Duration::ZERO;
176 Out {
177 writes: vec![self.cmd(tags::DATA_LAST, spdu)],
178 timer: Some(self.poll_interval),
179 ..Out::default()
180 }
181 }
182
183 pub fn tick(&mut self, elapsed: Duration) -> Out {
186 match self.state {
187 TcState::Idle => Out::default(),
188 TcState::Creating => {
189 if let Some(w) = self.awaiting.as_mut() {
190 *w += elapsed;
191 if *w >= self.reply_timeout {
192 self.state = TcState::Idle;
193 self.awaiting = None;
194 return Out {
195 error: Some(TransportError::SetupTimeout),
196 ..Out::default()
197 };
198 }
199 }
200 Out {
201 timer: Some(self.reply_timeout),
202 ..Out::default()
203 }
204 }
205 TcState::Active => {
206 self.since_poll += elapsed;
207 if self.since_poll >= self.poll_interval {
208 self.since_poll = Duration::ZERO;
209 self.awaiting = Some(Duration::ZERO);
210 Out {
211 writes: vec![self.poll_frame()],
212 timer: Some(self.poll_interval),
213 ..Out::default()
214 }
215 } else {
216 Out {
217 timer: Some(self.poll_interval - self.since_poll),
218 ..Out::default()
219 }
220 }
221 }
222 }
223 }
224
225 pub fn on_frame(&mut self, frame: &[u8]) -> Out {
232 self.awaiting = None;
233 match frame.first().copied() {
234 Some(tags::C_T_C_REPLY) => match TcObject::parse(frame) {
236 Ok(o) if o.t_c_id == self.tcid => {
237 self.state = TcState::Active;
238 self.since_poll = Duration::ZERO;
239 let da = parse_sb(&frame[3..]).is_some_and(|(_, sb)| sb.data_available());
240 self.after_status(da)
241 }
242 Ok(o) => self.wrong_tcid(o.t_c_id),
243 Err(_) => self.malformed(),
244 },
245 Some(tags::SB) => match parse_sb(frame) {
247 Some((tcid, _)) if tcid != self.tcid => self.wrong_tcid(tcid),
248 Some((_, sb)) => self.after_status(sb.data_available()),
249 None => self.malformed(),
250 },
251 Some(tags::T_C_ERROR) => Out {
252 error: Some(TransportError::ModuleError),
253 ..Out::default()
254 },
255 Some(tags::DATA_LAST | tags::DATA_MORE) => self.on_data(frame),
256 _ => self.malformed(),
257 }
258 }
259
260 fn malformed(&self) -> Out {
261 Out {
262 error: Some(TransportError::Malformed),
263 ..Out::default()
264 }
265 }
266
267 fn wrong_tcid(&self, got: u8) -> Out {
268 Out {
269 error: Some(TransportError::WrongTcId {
270 got,
271 expected: self.tcid,
272 }),
273 ..Out::default()
274 }
275 }
276
277 fn after_status(&mut self, data_available: bool) -> Out {
280 if data_available {
281 self.awaiting = Some(Duration::ZERO);
282 Out {
283 writes: vec![self.cmd(tags::RCV, &[])],
284 ..Out::default()
285 }
286 } else {
287 self.since_poll = Duration::ZERO;
288 Out {
289 timer: Some(self.poll_interval),
290 ..Out::default()
291 }
292 }
293 }
294
295 fn on_data(&mut self, frame: &[u8]) -> Out {
296 let r = match ResponseTpdu::parse(frame) {
297 Ok(r) => r,
298 Err(_) => {
299 return Out {
300 error: Some(TransportError::Malformed),
301 ..Out::default()
302 }
303 }
304 };
305 if r.t_c_id != self.tcid {
306 return Out {
307 error: Some(TransportError::WrongTcId {
308 got: r.t_c_id,
309 expected: self.tcid,
310 }),
311 ..Out::default()
312 };
313 }
314 self.reassembly.extend_from_slice(r.data);
315 match r.block {
316 Some(DataBlock::More) => {
318 self.awaiting = Some(Duration::ZERO);
319 Out {
320 writes: vec![self.cmd(tags::RCV, &[])],
321 ..Out::default()
322 }
323 }
324 _ => {
327 let mut out = self.after_status(r.sb_value.data_available());
328 if !self.reassembly.is_empty() {
329 out.spdus.push(core::mem::take(&mut self.reassembly));
330 }
331 out
332 }
333 }
334 }
335}
336
337#[cfg(test)]
338mod tests {
339 use super::*;
340 use dvb_ci::tpdu::SbValue;
341
342 fn r_tpdu(tag: u8, tcid: u8, data: &[u8], da: bool) -> Vec<u8> {
344 let mut v = vec![tag];
346 v.push((1 + data.len()) as u8);
347 v.push(tcid);
348 v.extend_from_slice(data);
349 v.extend_from_slice(&[tags::SB, 0x02, tcid, SbValue::new(da).0]);
350 v
351 }
352
353 #[test]
354 fn init_sends_create_tc_and_arms_timeout() {
355 let mut t = Transport::new(1);
356 let out = t.init();
357 assert_eq!(out.writes, vec![vec![tags::CREATE_T_C, 0x01, 0x01]]);
358 assert_eq!(t.state(), TcState::Creating);
359 assert_eq!(out.timer, Some(DEFAULT_REPLY_TIMEOUT));
360 }
361
362 #[test]
363 fn setup_times_out_to_idle() {
364 let mut t = Transport::new(1);
365 t.init();
366 let out = t.tick(DEFAULT_REPLY_TIMEOUT);
367 assert_eq!(out.error, Some(TransportError::SetupTimeout));
368 assert_eq!(t.state(), TcState::Idle);
369 }
370
371 #[test]
372 fn reply_activates_then_polls_on_interval() {
373 let mut t = Transport::new(1);
374 t.init();
375 let out = t.on_frame(&[tags::C_T_C_REPLY, 0x01, 0x01]);
376 assert_eq!(t.state(), TcState::Active);
377 assert!(out.error.is_none());
378 let early = t.tick(DEFAULT_POLL_INTERVAL / 2);
380 assert!(early.writes.is_empty());
381 let due = t.tick(DEFAULT_POLL_INTERVAL);
383 assert_eq!(due.writes, vec![vec![tags::DATA_LAST, 0x01, 0x01]]);
384 }
385
386 #[test]
387 fn reassembles_more_then_last_into_one_spdu() {
388 let mut t = Transport::new(1);
389 t.init();
390 t.on_frame(&[tags::C_T_C_REPLY, 0x01, 0x01]);
391 let o1 = t.on_frame(&r_tpdu(tags::DATA_MORE, 1, &[0xAA, 0xBB], false));
393 assert!(o1.spdus.is_empty());
394 assert_eq!(o1.writes, vec![vec![tags::RCV, 0x01, 0x01]]);
395 let o2 = t.on_frame(&r_tpdu(tags::DATA_LAST, 1, &[0xCC], false));
397 assert_eq!(o2.spdus, vec![vec![0xAA, 0xBB, 0xCC]]);
398 }
399
400 #[test]
401 fn data_available_triggers_rcv() {
402 let mut t = Transport::new(1);
403 t.init();
404 t.on_frame(&[tags::C_T_C_REPLY, 0x01, 0x01]);
405 let o = t.on_frame(&r_tpdu(tags::DATA_LAST, 1, &[0x01], true));
407 assert_eq!(o.spdus, vec![vec![0x01]]);
408 assert_eq!(o.writes, vec![vec![tags::RCV, 0x01, 0x01]]);
409 }
410
411 #[test]
412 fn wrong_tcid_is_flagged() {
413 let mut t = Transport::new(1);
414 t.init();
415 let o = t.on_frame(&[tags::C_T_C_REPLY, 0x01, 0x09]);
416 assert_eq!(
417 o.error,
418 Some(TransportError::WrongTcId {
419 got: 9,
420 expected: 1
421 })
422 );
423 }
424}