Skip to main content

dvb_ci_runtime/
transport.rs

1//! TPDU transport layer — a sans-IO state machine for the single transport
2//! connection per CI slot (ETSI EN 50221 §A.4).
3//!
4//! Connection lifecycle (Figures 6/7): `Idle → Creating → Active`. The host
5//! sends `Create_T_C`; the module answers `C_T_C_Reply` (→ `Active`) or the
6//! host times out back to `Idle`. In `Active` the host **polls regularly** —
7//! per §A.4, a poll is a `T_Data_Last` with an empty data field — and, whenever
8//! a Status Byte reports Data-Available (DA), sends `T_RCV` to receive the
9//! queued message. Chained module messages arrive as `T_Data_More*` then a
10//! final `T_Data_Last` and are reassembled into one SPDU payload.
11//!
12//! Timing: EN 50221 mandates regular polling and a reply-timeout arc but does
13//! not fix the interval, so [`DEFAULT_POLL_INTERVAL`] / [`DEFAULT_REPLY_TIMEOUT`]
14//! are implementation-chosen defaults. All timing is expressed via the sans-IO
15//! [`Tick`](crate::event::Event::Tick)/timer model so it is deterministic and
16//! testable without a clock.
17
18use std::collections::VecDeque;
19use std::time::Duration;
20
21use dvb_ci::tpdu::{create_t_c, tags, CommandTpdu, DataBlock, ResponseTpdu, SbValue, TcObject};
22use dvb_common::{Parse, Serialize};
23
24/// Length of a standalone/appended `T_SB` object: `tag · 0x02 · t_c_id · SB`.
25const SB_OBJECT_LEN: usize = 4;
26
27/// Parse a `T_SB` object (`0x80 0x02 t_c_id sb_value`) at the start of `bytes`.
28fn parse_sb(bytes: &[u8]) -> Option<(u8, SbValue)> {
29    if bytes.len() >= SB_OBJECT_LEN && bytes[0] == tags::SB && bytes[1] == 0x02 {
30        Some((bytes[2], SbValue(bytes[3])))
31    } else {
32        None
33    }
34}
35
36/// Conventional host poll interval (implementation-chosen; §A.4 mandates only
37/// "poll regularly").
38pub const DEFAULT_POLL_INTERVAL: Duration = Duration::from_millis(100);
39/// Conventional reply timeout for an expected `R_TPDU` (the §A.4 Figure 6/7
40/// "Timeout" arc; value implementation-chosen).
41pub const DEFAULT_REPLY_TIMEOUT: Duration = Duration::from_millis(1000);
42
43/// Transport connection state (EN 50221 §A.4, Figures 6/7).
44#[derive(Debug, Clone, Copy, PartialEq, Eq)]
45pub enum TcState {
46    /// No connection; nothing sent.
47    Idle,
48    /// `Create_T_C` sent, awaiting `C_T_C_Reply`.
49    Creating,
50    /// Connection up; polling/exchanging data.
51    Active,
52}
53
54/// What the transport layer wants done after handling an input.
55#[derive(Debug, Default, Clone, PartialEq, Eq)]
56pub struct Out {
57    /// Link-layer TPDU frames to write to the device, in order.
58    pub writes: Vec<Vec<u8>>,
59    /// Fully-reassembled SPDU payloads to pass up to the session layer.
60    pub spdus: Vec<Vec<u8>>,
61    /// Requested delay until the next [`Tick`](crate::event::Event::Tick).
62    pub timer: Option<Duration>,
63    /// A transport error (e.g. reply timeout, unexpected tag).
64    pub error: Option<TransportError>,
65}
66
67/// Transport-layer errors.
68#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
69#[non_exhaustive]
70pub enum TransportError {
71    /// No `C_T_C_Reply` within the reply timeout (§A.4 timeout arc).
72    #[error("transport connection setup timed out")]
73    SetupTimeout,
74    /// A reply arrived for a different `t_c_id` than ours.
75    #[error("unexpected t_c_id {got} (expected {expected})")]
76    WrongTcId {
77        /// The `t_c_id` received.
78        got: u8,
79        /// Our connection's `t_c_id`.
80        expected: u8,
81    },
82    /// The module reported a `T_C_Error`.
83    #[error("module reported T_C_Error")]
84    ModuleError,
85    /// A frame could not be parsed as an `R_TPDU`.
86    #[error("malformed R_TPDU")]
87    Malformed,
88}
89
90/// The single transport connection for a slot.
91#[derive(Debug)]
92pub struct Transport {
93    tcid: u8,
94    state: TcState,
95    reassembly: Vec<u8>,
96    poll_interval: Duration,
97    reply_timeout: Duration,
98    /// Time accumulated since the last poll (drives the poll cadence).
99    since_poll: Duration,
100    /// Time accumulated since a command that expects a reply (drives the
101    /// reply timeout); `None` when not awaiting a reply. While `Some`, a host
102    /// C_TPDU is *in flight* (sent, module not yet answered) — the link is
103    /// half-duplex, so no further data block may be sent until it clears.
104    awaiting: Option<Duration>,
105    /// SPDUs queued to send, one `T_Data_Last` per module turn. EN 50221's link
106    /// is polled half-duplex: the host sends a single data block, then must wait
107    /// for the module's `T_SB` before sending the next. Sending two back-to-back
108    /// makes a real CAM drop the second (issue #337).
109    outbound: VecDeque<Vec<u8>>,
110}
111
112impl Default for Transport {
113    fn default() -> Self {
114        Self::new(1)
115    }
116}
117
118impl Transport {
119    /// New transport for `tcid` with default timings.
120    #[must_use]
121    pub fn new(tcid: u8) -> Self {
122        Self {
123            tcid,
124            state: TcState::Idle,
125            reassembly: Vec::new(),
126            poll_interval: DEFAULT_POLL_INTERVAL,
127            reply_timeout: DEFAULT_REPLY_TIMEOUT,
128            since_poll: Duration::ZERO,
129            awaiting: None,
130            outbound: VecDeque::new(),
131        }
132    }
133
134    /// Override the poll interval / reply timeout.
135    #[must_use]
136    pub fn with_timing(mut self, poll: Duration, reply: Duration) -> Self {
137        self.poll_interval = poll;
138        self.reply_timeout = reply;
139        self
140    }
141
142    /// Current connection state.
143    #[must_use]
144    pub fn state(&self) -> TcState {
145        self.state
146    }
147
148    fn cmd(&self, tag: u8, data: &[u8]) -> Vec<u8> {
149        let c = CommandTpdu {
150            tag,
151            t_c_id: self.tcid,
152            data,
153        };
154        let mut buf = vec![0u8; c.serialized_len()];
155        // serialize_into only fails on a too-small buffer; ours is exact.
156        let n = c.serialize_into(&mut buf).expect("exact buffer");
157        buf.truncate(n);
158        buf
159    }
160
161    fn poll_frame(&self) -> Vec<u8> {
162        // §A.4: poll == T_Data_Last with empty data.
163        self.cmd(tags::DATA_LAST, &[])
164    }
165
166    /// Open the connection: emit `Create_T_C` and arm the reply timeout.
167    pub fn init(&mut self) -> Out {
168        self.state = TcState::Creating;
169        self.awaiting = Some(Duration::ZERO);
170        let obj: TcObject = create_t_c(self.tcid);
171        Out {
172            writes: vec![obj.to_bytes()],
173            timer: Some(self.reply_timeout),
174            ..Out::default()
175        }
176    }
177
178    /// Queue an upper-layer SPDU to send (wrapped in a `T_Data_Last`). The block
179    /// is transmitted now if the link is free, else held until the in-flight
180    /// C_TPDU is answered — one data block per module turn (§A.4 half-duplex).
181    pub fn send_spdu(&mut self, spdu: &[u8]) -> Out {
182        if self.state != TcState::Active {
183            return Out::default();
184        }
185        self.outbound.push_back(spdu.to_vec());
186        self.flush()
187    }
188
189    /// Emit the next queued data block if the link is free (Active and no
190    /// C_TPDU in flight); otherwise nothing (it waits for the module's `T_SB`).
191    fn flush(&mut self) -> Out {
192        if self.state != TcState::Active || self.awaiting.is_some() {
193            return Out::default();
194        }
195        match self.outbound.pop_front() {
196            Some(spdu) => {
197                self.awaiting = Some(Duration::ZERO);
198                self.since_poll = Duration::ZERO;
199                Out {
200                    writes: vec![self.cmd(tags::DATA_LAST, &spdu)],
201                    timer: Some(self.poll_interval),
202                    ..Out::default()
203                }
204            }
205            None => Out::default(),
206        }
207    }
208
209    /// Advance logical time by `elapsed`: poll if due, or time out a pending
210    /// reply.
211    pub fn tick(&mut self, elapsed: Duration) -> Out {
212        match self.state {
213            TcState::Idle => Out::default(),
214            TcState::Creating => {
215                if let Some(w) = self.awaiting.as_mut() {
216                    *w += elapsed;
217                    if *w >= self.reply_timeout {
218                        self.state = TcState::Idle;
219                        self.awaiting = None;
220                        return Out {
221                            error: Some(TransportError::SetupTimeout),
222                            ..Out::default()
223                        };
224                    }
225                }
226                Out {
227                    timer: Some(self.reply_timeout),
228                    ..Out::default()
229                }
230            }
231            TcState::Active => {
232                self.since_poll += elapsed;
233                if self.since_poll >= self.poll_interval {
234                    self.since_poll = Duration::ZERO;
235                    // A queued data block goes out in preference to an empty
236                    // poll, but only when no C_TPDU is in flight.
237                    if self.awaiting.is_none() && !self.outbound.is_empty() {
238                        return self.flush();
239                    }
240                    self.awaiting = Some(Duration::ZERO);
241                    Out {
242                        writes: vec![self.poll_frame()],
243                        timer: Some(self.poll_interval),
244                        ..Out::default()
245                    }
246                } else {
247                    Out {
248                        timer: Some(self.poll_interval - self.since_poll),
249                        ..Out::default()
250                    }
251                }
252            }
253        }
254    }
255
256    /// Handle one link-layer frame read from the device.
257    ///
258    /// A module frame is a leading object (`C_T_C_Reply` / `T_Data_*` / …)
259    /// followed by an appended `T_SB`, or a standalone `T_SB` (the reply to a
260    /// poll with nothing queued). The `T_SB`'s DA bit drives whether the host
261    /// must `T_RCV` next.
262    pub fn on_frame(&mut self, frame: &[u8]) -> Out {
263        self.awaiting = None;
264        match frame.first().copied() {
265            // C_T_C_Reply (+ appended T_SB): connection becomes Active.
266            Some(tags::C_T_C_REPLY) => match TcObject::parse(frame) {
267                Ok(o) if o.t_c_id == self.tcid => {
268                    self.state = TcState::Active;
269                    self.since_poll = Duration::ZERO;
270                    let da = parse_sb(&frame[3..]).is_some_and(|(_, sb)| sb.data_available());
271                    self.after_status(da)
272                }
273                Ok(o) => self.wrong_tcid(o.t_c_id),
274                Err(_) => self.malformed(),
275            },
276            // Standalone T_SB — the reply to a poll.
277            Some(tags::SB) => match parse_sb(frame) {
278                Some((tcid, _)) if tcid != self.tcid => self.wrong_tcid(tcid),
279                Some((_, sb)) => self.after_status(sb.data_available()),
280                None => self.malformed(),
281            },
282            Some(tags::T_C_ERROR) => Out {
283                error: Some(TransportError::ModuleError),
284                ..Out::default()
285            },
286            Some(tags::DATA_LAST | tags::DATA_MORE) => self.on_data(frame),
287            _ => self.malformed(),
288        }
289    }
290
291    fn malformed(&self) -> Out {
292        Out {
293            error: Some(TransportError::Malformed),
294            ..Out::default()
295        }
296    }
297
298    fn wrong_tcid(&self, got: u8) -> Out {
299        Out {
300            error: Some(TransportError::WrongTcId {
301                got,
302                expected: self.tcid,
303            }),
304            ..Out::default()
305        }
306    }
307
308    /// React to a Status Byte: if DA, solicit the queued message with `T_RCV`;
309    /// otherwise resume the idle poll cadence.
310    fn after_status(&mut self, data_available: bool) -> Out {
311        if data_available {
312            self.awaiting = Some(Duration::ZERO);
313            Out {
314                writes: vec![self.cmd(tags::RCV, &[])],
315                ..Out::default()
316            }
317        } else {
318            // Module idle: its `T_SB` freed the link, so send the next queued
319            // data block if any (the #337 fix); otherwise resume polling.
320            if !self.outbound.is_empty() {
321                return self.flush();
322            }
323            self.since_poll = Duration::ZERO;
324            Out {
325                timer: Some(self.poll_interval),
326                ..Out::default()
327            }
328        }
329    }
330
331    fn on_data(&mut self, frame: &[u8]) -> Out {
332        let r = match ResponseTpdu::parse(frame) {
333            Ok(r) => r,
334            Err(_) => {
335                return Out {
336                    error: Some(TransportError::Malformed),
337                    ..Out::default()
338                }
339            }
340        };
341        if r.t_c_id != self.tcid {
342            return Out {
343                error: Some(TransportError::WrongTcId {
344                    got: r.t_c_id,
345                    expected: self.tcid,
346                }),
347                ..Out::default()
348            };
349        }
350        self.reassembly.extend_from_slice(r.data);
351        match r.block {
352            // More chained fragments: each waits for another T_RCV (§A.4 item 10).
353            Some(DataBlock::More) => {
354                self.awaiting = Some(Duration::ZERO);
355                Out {
356                    writes: vec![self.cmd(tags::RCV, &[])],
357                    ..Out::default()
358                }
359            }
360            // Last (or only) fragment: emit the reassembled SPDU, then let the
361            // appended Status Byte decide whether to receive another message.
362            _ => {
363                let mut out = self.after_status(r.sb_value.data_available());
364                if !self.reassembly.is_empty() {
365                    out.spdus.push(core::mem::take(&mut self.reassembly));
366                }
367                out
368            }
369        }
370    }
371}
372
373#[cfg(test)]
374mod tests {
375    use super::*;
376    use dvb_ci::tpdu::SbValue;
377
378    /// Build an R_TPDU frame (module→host) for tests.
379    fn r_tpdu(tag: u8, tcid: u8, data: &[u8], da: bool) -> Vec<u8> {
380        // tag, length_field(=1+data), tcid, data..., SB(0x80), len=2, tcid, sb_value
381        let mut v = vec![tag];
382        v.push((1 + data.len()) as u8);
383        v.push(tcid);
384        v.extend_from_slice(data);
385        v.extend_from_slice(&[tags::SB, 0x02, tcid, SbValue::new(da).0]);
386        v
387    }
388
389    #[test]
390    fn init_sends_create_tc_and_arms_timeout() {
391        let mut t = Transport::new(1);
392        let out = t.init();
393        assert_eq!(out.writes, vec![vec![tags::CREATE_T_C, 0x01, 0x01]]);
394        assert_eq!(t.state(), TcState::Creating);
395        assert_eq!(out.timer, Some(DEFAULT_REPLY_TIMEOUT));
396    }
397
398    #[test]
399    fn setup_times_out_to_idle() {
400        let mut t = Transport::new(1);
401        t.init();
402        let out = t.tick(DEFAULT_REPLY_TIMEOUT);
403        assert_eq!(out.error, Some(TransportError::SetupTimeout));
404        assert_eq!(t.state(), TcState::Idle);
405    }
406
407    #[test]
408    fn reply_activates_then_polls_on_interval() {
409        let mut t = Transport::new(1);
410        t.init();
411        let out = t.on_frame(&[tags::C_T_C_REPLY, 0x01, 0x01]);
412        assert_eq!(t.state(), TcState::Active);
413        assert!(out.error.is_none());
414        // Before the interval: no poll.
415        let early = t.tick(DEFAULT_POLL_INTERVAL / 2);
416        assert!(early.writes.is_empty());
417        // Crossing the interval: an empty T_Data_Last poll.
418        let due = t.tick(DEFAULT_POLL_INTERVAL);
419        assert_eq!(due.writes, vec![vec![tags::DATA_LAST, 0x01, 0x01]]);
420    }
421
422    #[test]
423    fn reassembles_more_then_last_into_one_spdu() {
424        let mut t = Transport::new(1);
425        t.init();
426        t.on_frame(&[tags::C_T_C_REPLY, 0x01, 0x01]);
427        // MORE: partial data, solicits RCV
428        let o1 = t.on_frame(&r_tpdu(tags::DATA_MORE, 1, &[0xAA, 0xBB], false));
429        assert!(o1.spdus.is_empty());
430        assert_eq!(o1.writes, vec![vec![tags::RCV, 0x01, 0x01]]);
431        // LAST: completes the SPDU
432        let o2 = t.on_frame(&r_tpdu(tags::DATA_LAST, 1, &[0xCC], false));
433        assert_eq!(o2.spdus, vec![vec![0xAA, 0xBB, 0xCC]]);
434    }
435
436    #[test]
437    fn data_available_triggers_rcv() {
438        let mut t = Transport::new(1);
439        t.init();
440        t.on_frame(&[tags::C_T_C_REPLY, 0x01, 0x01]);
441        // LAST with DA set → host must RCV the next queued message.
442        let o = t.on_frame(&r_tpdu(tags::DATA_LAST, 1, &[0x01], true));
443        assert_eq!(o.spdus, vec![vec![0x01]]);
444        assert_eq!(o.writes, vec![vec![tags::RCV, 0x01, 0x01]]);
445    }
446
447    #[test]
448    fn two_sends_serialize_one_block_per_module_turn() {
449        // #337: a real CAM drops a second T_Data_Last sent before it answers the
450        // first. Two send_spdu in one turn must emit only ONE write; the second
451        // goes out after the module's T_SB.
452        let mut t = Transport::new(1);
453        t.init();
454        t.on_frame(&[tags::C_T_C_REPLY, 0x01, 0x01]);
455
456        let first = t.send_spdu(&[0x92, 0x07]); // e.g. open_session_response
457        assert_eq!(first.writes.len(), 1);
458        assert_eq!(first.writes[0][0], tags::DATA_LAST);
459
460        // Queued while the first is in flight → no write yet.
461        let second = t.send_spdu(&[0x9F, 0x80, 0x10, 0x00]); // profile_enq
462        assert!(
463            second.writes.is_empty(),
464            "second block must wait for the SB"
465        );
466
467        // Module acknowledges with a standalone T_SB (data_available = 0).
468        let after_sb = t.on_frame(&[tags::SB, 0x02, 0x01, SbValue::new(false).0]);
469        assert_eq!(
470            after_sb.writes.len(),
471            1,
472            "second block flushes after the SB"
473        );
474        assert_eq!(after_sb.writes[0][0], tags::DATA_LAST);
475        // It carries the profile_enq payload.
476        assert!(after_sb.writes[0]
477            .windows(4)
478            .any(|w| w == [0x9F, 0x80, 0x10, 0x00]));
479    }
480
481    #[test]
482    fn wrong_tcid_is_flagged() {
483        let mut t = Transport::new(1);
484        t.init();
485        let o = t.on_frame(&[tags::C_T_C_REPLY, 0x01, 0x09]);
486        assert_eq!(
487            o.error,
488            Some(TransportError::WrongTcId {
489                got: 9,
490                expected: 1
491            })
492        );
493    }
494}