1use std::collections::VecDeque;
19use std::time::Duration;
20
21use dvb_ci::tpdu::{create_t_c, tags, CommandTpdu, DataBlock, ResponseTpdu, SbValue, TcObject};
22use dvb_common::{Parse, Serialize};
23
24const SB_OBJECT_LEN: usize = 4;
26
27fn parse_sb(bytes: &[u8]) -> Option<(u8, SbValue)> {
29 if bytes.len() >= SB_OBJECT_LEN && bytes[0] == tags::SB && bytes[1] == 0x02 {
30 Some((bytes[2], SbValue(bytes[3])))
31 } else {
32 None
33 }
34}
35
36pub const DEFAULT_POLL_INTERVAL: Duration = Duration::from_millis(100);
39pub const DEFAULT_REPLY_TIMEOUT: Duration = Duration::from_millis(1000);
42
43#[derive(Debug, Clone, Copy, PartialEq, Eq)]
45pub enum TcState {
46 Idle,
48 Creating,
50 Active,
52}
53
54#[derive(Debug, Default, Clone, PartialEq, Eq)]
56pub struct Out {
57 pub writes: Vec<Vec<u8>>,
59 pub spdus: Vec<Vec<u8>>,
61 pub timer: Option<Duration>,
63 pub error: Option<TransportError>,
65}
66
67#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
69#[non_exhaustive]
70pub enum TransportError {
71 #[error("transport connection setup timed out")]
73 SetupTimeout,
74 #[error("unexpected t_c_id {got} (expected {expected})")]
76 WrongTcId {
77 got: u8,
79 expected: u8,
81 },
82 #[error("module reported T_C_Error")]
84 ModuleError,
85 #[error("malformed R_TPDU")]
87 Malformed,
88}
89
90#[derive(Debug)]
92pub struct Transport {
93 tcid: u8,
94 state: TcState,
95 reassembly: Vec<u8>,
96 poll_interval: Duration,
97 reply_timeout: Duration,
98 since_poll: Duration,
100 awaiting: Option<Duration>,
105 outbound: VecDeque<Vec<u8>>,
110}
111
112impl Default for Transport {
113 fn default() -> Self {
114 Self::new(1)
115 }
116}
117
118impl Transport {
119 #[must_use]
121 pub fn new(tcid: u8) -> Self {
122 Self {
123 tcid,
124 state: TcState::Idle,
125 reassembly: Vec::new(),
126 poll_interval: DEFAULT_POLL_INTERVAL,
127 reply_timeout: DEFAULT_REPLY_TIMEOUT,
128 since_poll: Duration::ZERO,
129 awaiting: None,
130 outbound: VecDeque::new(),
131 }
132 }
133
134 #[must_use]
136 pub fn with_timing(mut self, poll: Duration, reply: Duration) -> Self {
137 self.poll_interval = poll;
138 self.reply_timeout = reply;
139 self
140 }
141
142 #[must_use]
144 pub fn state(&self) -> TcState {
145 self.state
146 }
147
148 fn cmd(&self, tag: u8, data: &[u8]) -> Vec<u8> {
149 let c = CommandTpdu {
150 tag,
151 t_c_id: self.tcid,
152 data,
153 };
154 let mut buf = vec![0u8; c.serialized_len()];
155 let n = c.serialize_into(&mut buf).expect("exact buffer");
157 buf.truncate(n);
158 buf
159 }
160
161 fn poll_frame(&self) -> Vec<u8> {
162 self.cmd(tags::DATA_LAST, &[])
164 }
165
166 pub fn init(&mut self) -> Out {
168 self.state = TcState::Creating;
169 self.awaiting = Some(Duration::ZERO);
170 let obj: TcObject = create_t_c(self.tcid);
171 Out {
172 writes: vec![obj.to_bytes()],
173 timer: Some(self.reply_timeout),
174 ..Out::default()
175 }
176 }
177
178 pub fn send_spdu(&mut self, spdu: &[u8]) -> Out {
182 if self.state != TcState::Active {
183 return Out::default();
184 }
185 self.outbound.push_back(spdu.to_vec());
186 self.flush()
187 }
188
189 fn flush(&mut self) -> Out {
192 if self.state != TcState::Active || self.awaiting.is_some() {
193 return Out::default();
194 }
195 match self.outbound.pop_front() {
196 Some(spdu) => {
197 self.awaiting = Some(Duration::ZERO);
198 self.since_poll = Duration::ZERO;
199 Out {
200 writes: vec![self.cmd(tags::DATA_LAST, &spdu)],
201 timer: Some(self.poll_interval),
202 ..Out::default()
203 }
204 }
205 None => Out::default(),
206 }
207 }
208
209 pub fn tick(&mut self, elapsed: Duration) -> Out {
212 match self.state {
213 TcState::Idle => Out::default(),
214 TcState::Creating => {
215 if let Some(w) = self.awaiting.as_mut() {
216 *w += elapsed;
217 if *w >= self.reply_timeout {
218 self.state = TcState::Idle;
219 self.awaiting = None;
220 return Out {
221 error: Some(TransportError::SetupTimeout),
222 ..Out::default()
223 };
224 }
225 }
226 Out {
227 timer: Some(self.reply_timeout),
228 ..Out::default()
229 }
230 }
231 TcState::Active => {
232 self.since_poll += elapsed;
233 if self.since_poll >= self.poll_interval {
234 self.since_poll = Duration::ZERO;
235 if self.awaiting.is_none() && !self.outbound.is_empty() {
238 return self.flush();
239 }
240 self.awaiting = Some(Duration::ZERO);
241 Out {
242 writes: vec![self.poll_frame()],
243 timer: Some(self.poll_interval),
244 ..Out::default()
245 }
246 } else {
247 Out {
248 timer: Some(self.poll_interval - self.since_poll),
249 ..Out::default()
250 }
251 }
252 }
253 }
254 }
255
256 pub fn on_frame(&mut self, frame: &[u8]) -> Out {
263 self.awaiting = None;
264 match frame.first().copied() {
265 Some(tags::C_T_C_REPLY) => match TcObject::parse(frame) {
267 Ok(o) if o.t_c_id == self.tcid => {
268 self.state = TcState::Active;
269 self.since_poll = Duration::ZERO;
270 let da = parse_sb(&frame[3..]).is_some_and(|(_, sb)| sb.data_available());
271 self.after_status(da)
272 }
273 Ok(o) => self.wrong_tcid(o.t_c_id),
274 Err(_) => self.malformed(),
275 },
276 Some(tags::SB) => match parse_sb(frame) {
278 Some((tcid, _)) if tcid != self.tcid => self.wrong_tcid(tcid),
279 Some((_, sb)) => self.after_status(sb.data_available()),
280 None => self.malformed(),
281 },
282 Some(tags::T_C_ERROR) => Out {
283 error: Some(TransportError::ModuleError),
284 ..Out::default()
285 },
286 Some(tags::DATA_LAST | tags::DATA_MORE) => self.on_data(frame),
287 _ => self.malformed(),
288 }
289 }
290
291 fn malformed(&self) -> Out {
292 Out {
293 error: Some(TransportError::Malformed),
294 ..Out::default()
295 }
296 }
297
298 fn wrong_tcid(&self, got: u8) -> Out {
299 Out {
300 error: Some(TransportError::WrongTcId {
301 got,
302 expected: self.tcid,
303 }),
304 ..Out::default()
305 }
306 }
307
308 fn after_status(&mut self, data_available: bool) -> Out {
311 if data_available {
312 self.awaiting = Some(Duration::ZERO);
313 Out {
314 writes: vec![self.cmd(tags::RCV, &[])],
315 ..Out::default()
316 }
317 } else {
318 if !self.outbound.is_empty() {
321 return self.flush();
322 }
323 self.since_poll = Duration::ZERO;
324 Out {
325 timer: Some(self.poll_interval),
326 ..Out::default()
327 }
328 }
329 }
330
331 fn on_data(&mut self, frame: &[u8]) -> Out {
332 let r = match ResponseTpdu::parse(frame) {
333 Ok(r) => r,
334 Err(_) => {
335 return Out {
336 error: Some(TransportError::Malformed),
337 ..Out::default()
338 }
339 }
340 };
341 if r.t_c_id != self.tcid {
342 return Out {
343 error: Some(TransportError::WrongTcId {
344 got: r.t_c_id,
345 expected: self.tcid,
346 }),
347 ..Out::default()
348 };
349 }
350 self.reassembly.extend_from_slice(r.data);
351 match r.block {
352 Some(DataBlock::More) => {
354 self.awaiting = Some(Duration::ZERO);
355 Out {
356 writes: vec![self.cmd(tags::RCV, &[])],
357 ..Out::default()
358 }
359 }
360 _ => {
363 let mut out = self.after_status(r.sb_value.data_available());
364 if !self.reassembly.is_empty() {
365 out.spdus.push(core::mem::take(&mut self.reassembly));
366 }
367 out
368 }
369 }
370 }
371}
372
373#[cfg(test)]
374mod tests {
375 use super::*;
376 use dvb_ci::tpdu::SbValue;
377
378 fn r_tpdu(tag: u8, tcid: u8, data: &[u8], da: bool) -> Vec<u8> {
380 let mut v = vec![tag];
382 v.push((1 + data.len()) as u8);
383 v.push(tcid);
384 v.extend_from_slice(data);
385 v.extend_from_slice(&[tags::SB, 0x02, tcid, SbValue::new(da).0]);
386 v
387 }
388
389 #[test]
390 fn init_sends_create_tc_and_arms_timeout() {
391 let mut t = Transport::new(1);
392 let out = t.init();
393 assert_eq!(out.writes, vec![vec![tags::CREATE_T_C, 0x01, 0x01]]);
394 assert_eq!(t.state(), TcState::Creating);
395 assert_eq!(out.timer, Some(DEFAULT_REPLY_TIMEOUT));
396 }
397
398 #[test]
399 fn setup_times_out_to_idle() {
400 let mut t = Transport::new(1);
401 t.init();
402 let out = t.tick(DEFAULT_REPLY_TIMEOUT);
403 assert_eq!(out.error, Some(TransportError::SetupTimeout));
404 assert_eq!(t.state(), TcState::Idle);
405 }
406
407 #[test]
408 fn reply_activates_then_polls_on_interval() {
409 let mut t = Transport::new(1);
410 t.init();
411 let out = t.on_frame(&[tags::C_T_C_REPLY, 0x01, 0x01]);
412 assert_eq!(t.state(), TcState::Active);
413 assert!(out.error.is_none());
414 let early = t.tick(DEFAULT_POLL_INTERVAL / 2);
416 assert!(early.writes.is_empty());
417 let due = t.tick(DEFAULT_POLL_INTERVAL);
419 assert_eq!(due.writes, vec![vec![tags::DATA_LAST, 0x01, 0x01]]);
420 }
421
422 #[test]
423 fn reassembles_more_then_last_into_one_spdu() {
424 let mut t = Transport::new(1);
425 t.init();
426 t.on_frame(&[tags::C_T_C_REPLY, 0x01, 0x01]);
427 let o1 = t.on_frame(&r_tpdu(tags::DATA_MORE, 1, &[0xAA, 0xBB], false));
429 assert!(o1.spdus.is_empty());
430 assert_eq!(o1.writes, vec![vec![tags::RCV, 0x01, 0x01]]);
431 let o2 = t.on_frame(&r_tpdu(tags::DATA_LAST, 1, &[0xCC], false));
433 assert_eq!(o2.spdus, vec![vec![0xAA, 0xBB, 0xCC]]);
434 }
435
436 #[test]
437 fn data_available_triggers_rcv() {
438 let mut t = Transport::new(1);
439 t.init();
440 t.on_frame(&[tags::C_T_C_REPLY, 0x01, 0x01]);
441 let o = t.on_frame(&r_tpdu(tags::DATA_LAST, 1, &[0x01], true));
443 assert_eq!(o.spdus, vec![vec![0x01]]);
444 assert_eq!(o.writes, vec![vec![tags::RCV, 0x01, 0x01]]);
445 }
446
447 #[test]
448 fn two_sends_serialize_one_block_per_module_turn() {
449 let mut t = Transport::new(1);
453 t.init();
454 t.on_frame(&[tags::C_T_C_REPLY, 0x01, 0x01]);
455
456 let first = t.send_spdu(&[0x92, 0x07]); assert_eq!(first.writes.len(), 1);
458 assert_eq!(first.writes[0][0], tags::DATA_LAST);
459
460 let second = t.send_spdu(&[0x9F, 0x80, 0x10, 0x00]); assert!(
463 second.writes.is_empty(),
464 "second block must wait for the SB"
465 );
466
467 let after_sb = t.on_frame(&[tags::SB, 0x02, 0x01, SbValue::new(false).0]);
469 assert_eq!(
470 after_sb.writes.len(),
471 1,
472 "second block flushes after the SB"
473 );
474 assert_eq!(after_sb.writes[0][0], tags::DATA_LAST);
475 assert!(after_sb.writes[0]
477 .windows(4)
478 .any(|w| w == [0x9F, 0x80, 0x10, 0x00]));
479 }
480
481 #[test]
482 fn wrong_tcid_is_flagged() {
483 let mut t = Transport::new(1);
484 t.init();
485 let o = t.on_frame(&[tags::C_T_C_REPLY, 0x01, 0x09]);
486 assert_eq!(
487 o.error,
488 Some(TransportError::WrongTcId {
489 got: 9,
490 expected: 1
491 })
492 );
493 }
494}