Skip to main content

dvb_ci_runtime/
transport.rs

1//! TPDU transport layer — a sans-IO state machine for the single transport
2//! connection per CI slot (ETSI EN 50221 §A.4).
3//!
4//! Connection lifecycle (Figures 6/7): `Idle → Creating → Active`. The host
5//! sends `Create_T_C`; the module answers `C_T_C_Reply` (→ `Active`) or the
6//! host times out back to `Idle`. In `Active` the host **polls regularly** —
7//! per §A.4, a poll is a `T_Data_Last` with an empty data field — and, whenever
8//! a Status Byte reports Data-Available (DA), sends `T_RCV` to receive the
9//! queued message. Chained module messages arrive as `T_Data_More*` then a
10//! final `T_Data_Last` and are reassembled into one SPDU payload.
11//!
12//! Timing: EN 50221 mandates regular polling and a reply-timeout arc but does
13//! not fix the interval, so [`DEFAULT_POLL_INTERVAL`] / [`DEFAULT_REPLY_TIMEOUT`]
14//! are implementation-chosen defaults. All timing is expressed via the sans-IO
15//! [`Tick`](crate::event::Event::Tick)/timer model so it is deterministic and
16//! testable without a clock.
17
18use std::time::Duration;
19
20use dvb_ci::tpdu::{create_t_c, tags, CommandTpdu, DataBlock, ResponseTpdu, SbValue, TcObject};
21use dvb_common::{Parse, Serialize};
22
23/// Length of a standalone/appended `T_SB` object: `tag · 0x02 · t_c_id · SB`.
24const SB_OBJECT_LEN: usize = 4;
25
26/// Parse a `T_SB` object (`0x80 0x02 t_c_id sb_value`) at the start of `bytes`.
27fn parse_sb(bytes: &[u8]) -> Option<(u8, SbValue)> {
28    if bytes.len() >= SB_OBJECT_LEN && bytes[0] == tags::SB && bytes[1] == 0x02 {
29        Some((bytes[2], SbValue(bytes[3])))
30    } else {
31        None
32    }
33}
34
35/// Conventional host poll interval (implementation-chosen; §A.4 mandates only
36/// "poll regularly").
37pub const DEFAULT_POLL_INTERVAL: Duration = Duration::from_millis(100);
38/// Conventional reply timeout for an expected `R_TPDU` (the §A.4 Figure 6/7
39/// "Timeout" arc; value implementation-chosen).
40pub const DEFAULT_REPLY_TIMEOUT: Duration = Duration::from_millis(1000);
41
42/// Transport connection state (EN 50221 §A.4, Figures 6/7).
43#[derive(Debug, Clone, Copy, PartialEq, Eq)]
44pub enum TcState {
45    /// No connection; nothing sent.
46    Idle,
47    /// `Create_T_C` sent, awaiting `C_T_C_Reply`.
48    Creating,
49    /// Connection up; polling/exchanging data.
50    Active,
51}
52
53/// What the transport layer wants done after handling an input.
54#[derive(Debug, Default, Clone, PartialEq, Eq)]
55pub struct Out {
56    /// Link-layer TPDU frames to write to the device, in order.
57    pub writes: Vec<Vec<u8>>,
58    /// Fully-reassembled SPDU payloads to pass up to the session layer.
59    pub spdus: Vec<Vec<u8>>,
60    /// Requested delay until the next [`Tick`](crate::event::Event::Tick).
61    pub timer: Option<Duration>,
62    /// A transport error (e.g. reply timeout, unexpected tag).
63    pub error: Option<TransportError>,
64}
65
66/// Transport-layer errors.
67#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
68#[non_exhaustive]
69pub enum TransportError {
70    /// No `C_T_C_Reply` within the reply timeout (§A.4 timeout arc).
71    #[error("transport connection setup timed out")]
72    SetupTimeout,
73    /// A reply arrived for a different `t_c_id` than ours.
74    #[error("unexpected t_c_id {got} (expected {expected})")]
75    WrongTcId {
76        /// The `t_c_id` received.
77        got: u8,
78        /// Our connection's `t_c_id`.
79        expected: u8,
80    },
81    /// The module reported a `T_C_Error`.
82    #[error("module reported T_C_Error")]
83    ModuleError,
84    /// A frame could not be parsed as an `R_TPDU`.
85    #[error("malformed R_TPDU")]
86    Malformed,
87}
88
89/// The single transport connection for a slot.
90#[derive(Debug)]
91pub struct Transport {
92    tcid: u8,
93    state: TcState,
94    reassembly: Vec<u8>,
95    poll_interval: Duration,
96    reply_timeout: Duration,
97    /// Time accumulated since the last poll (drives the poll cadence).
98    since_poll: Duration,
99    /// Time accumulated since a command that expects a reply (drives the
100    /// reply timeout); `None` when not awaiting a reply.
101    awaiting: Option<Duration>,
102}
103
104impl Default for Transport {
105    fn default() -> Self {
106        Self::new(1)
107    }
108}
109
110impl Transport {
111    /// New transport for `tcid` with default timings.
112    #[must_use]
113    pub fn new(tcid: u8) -> Self {
114        Self {
115            tcid,
116            state: TcState::Idle,
117            reassembly: Vec::new(),
118            poll_interval: DEFAULT_POLL_INTERVAL,
119            reply_timeout: DEFAULT_REPLY_TIMEOUT,
120            since_poll: Duration::ZERO,
121            awaiting: None,
122        }
123    }
124
125    /// Override the poll interval / reply timeout.
126    #[must_use]
127    pub fn with_timing(mut self, poll: Duration, reply: Duration) -> Self {
128        self.poll_interval = poll;
129        self.reply_timeout = reply;
130        self
131    }
132
133    /// Current connection state.
134    #[must_use]
135    pub fn state(&self) -> TcState {
136        self.state
137    }
138
139    fn cmd(&self, tag: u8, data: &[u8]) -> Vec<u8> {
140        let c = CommandTpdu {
141            tag,
142            t_c_id: self.tcid,
143            data,
144        };
145        let mut buf = vec![0u8; c.serialized_len()];
146        // serialize_into only fails on a too-small buffer; ours is exact.
147        let n = c.serialize_into(&mut buf).expect("exact buffer");
148        buf.truncate(n);
149        buf
150    }
151
152    fn poll_frame(&self) -> Vec<u8> {
153        // §A.4: poll == T_Data_Last with empty data.
154        self.cmd(tags::DATA_LAST, &[])
155    }
156
157    /// Open the connection: emit `Create_T_C` and arm the reply timeout.
158    pub fn init(&mut self) -> Out {
159        self.state = TcState::Creating;
160        self.awaiting = Some(Duration::ZERO);
161        let obj: TcObject = create_t_c(self.tcid);
162        Out {
163            writes: vec![obj.to_bytes()],
164            timer: Some(self.reply_timeout),
165            ..Out::default()
166        }
167    }
168
169    /// Queue an upper-layer SPDU to send (wrapped in a `T_Data_Last`).
170    pub fn send_spdu(&mut self, spdu: &[u8]) -> Out {
171        if self.state != TcState::Active {
172            return Out::default();
173        }
174        self.awaiting = Some(Duration::ZERO);
175        self.since_poll = Duration::ZERO;
176        Out {
177            writes: vec![self.cmd(tags::DATA_LAST, spdu)],
178            timer: Some(self.poll_interval),
179            ..Out::default()
180        }
181    }
182
183    /// Advance logical time by `elapsed`: poll if due, or time out a pending
184    /// reply.
185    pub fn tick(&mut self, elapsed: Duration) -> Out {
186        match self.state {
187            TcState::Idle => Out::default(),
188            TcState::Creating => {
189                if let Some(w) = self.awaiting.as_mut() {
190                    *w += elapsed;
191                    if *w >= self.reply_timeout {
192                        self.state = TcState::Idle;
193                        self.awaiting = None;
194                        return Out {
195                            error: Some(TransportError::SetupTimeout),
196                            ..Out::default()
197                        };
198                    }
199                }
200                Out {
201                    timer: Some(self.reply_timeout),
202                    ..Out::default()
203                }
204            }
205            TcState::Active => {
206                self.since_poll += elapsed;
207                if self.since_poll >= self.poll_interval {
208                    self.since_poll = Duration::ZERO;
209                    self.awaiting = Some(Duration::ZERO);
210                    Out {
211                        writes: vec![self.poll_frame()],
212                        timer: Some(self.poll_interval),
213                        ..Out::default()
214                    }
215                } else {
216                    Out {
217                        timer: Some(self.poll_interval - self.since_poll),
218                        ..Out::default()
219                    }
220                }
221            }
222        }
223    }
224
225    /// Handle one link-layer frame read from the device.
226    ///
227    /// A module frame is a leading object (`C_T_C_Reply` / `T_Data_*` / …)
228    /// followed by an appended `T_SB`, or a standalone `T_SB` (the reply to a
229    /// poll with nothing queued). The `T_SB`'s DA bit drives whether the host
230    /// must `T_RCV` next.
231    pub fn on_frame(&mut self, frame: &[u8]) -> Out {
232        self.awaiting = None;
233        match frame.first().copied() {
234            // C_T_C_Reply (+ appended T_SB): connection becomes Active.
235            Some(tags::C_T_C_REPLY) => match TcObject::parse(frame) {
236                Ok(o) if o.t_c_id == self.tcid => {
237                    self.state = TcState::Active;
238                    self.since_poll = Duration::ZERO;
239                    let da = parse_sb(&frame[3..]).is_some_and(|(_, sb)| sb.data_available());
240                    self.after_status(da)
241                }
242                Ok(o) => self.wrong_tcid(o.t_c_id),
243                Err(_) => self.malformed(),
244            },
245            // Standalone T_SB — the reply to a poll.
246            Some(tags::SB) => match parse_sb(frame) {
247                Some((tcid, _)) if tcid != self.tcid => self.wrong_tcid(tcid),
248                Some((_, sb)) => self.after_status(sb.data_available()),
249                None => self.malformed(),
250            },
251            Some(tags::T_C_ERROR) => Out {
252                error: Some(TransportError::ModuleError),
253                ..Out::default()
254            },
255            Some(tags::DATA_LAST | tags::DATA_MORE) => self.on_data(frame),
256            _ => self.malformed(),
257        }
258    }
259
260    fn malformed(&self) -> Out {
261        Out {
262            error: Some(TransportError::Malformed),
263            ..Out::default()
264        }
265    }
266
267    fn wrong_tcid(&self, got: u8) -> Out {
268        Out {
269            error: Some(TransportError::WrongTcId {
270                got,
271                expected: self.tcid,
272            }),
273            ..Out::default()
274        }
275    }
276
277    /// React to a Status Byte: if DA, solicit the queued message with `T_RCV`;
278    /// otherwise resume the idle poll cadence.
279    fn after_status(&mut self, data_available: bool) -> Out {
280        if data_available {
281            self.awaiting = Some(Duration::ZERO);
282            Out {
283                writes: vec![self.cmd(tags::RCV, &[])],
284                ..Out::default()
285            }
286        } else {
287            self.since_poll = Duration::ZERO;
288            Out {
289                timer: Some(self.poll_interval),
290                ..Out::default()
291            }
292        }
293    }
294
295    fn on_data(&mut self, frame: &[u8]) -> Out {
296        let r = match ResponseTpdu::parse(frame) {
297            Ok(r) => r,
298            Err(_) => {
299                return Out {
300                    error: Some(TransportError::Malformed),
301                    ..Out::default()
302                }
303            }
304        };
305        if r.t_c_id != self.tcid {
306            return Out {
307                error: Some(TransportError::WrongTcId {
308                    got: r.t_c_id,
309                    expected: self.tcid,
310                }),
311                ..Out::default()
312            };
313        }
314        self.reassembly.extend_from_slice(r.data);
315        match r.block {
316            // More chained fragments: each waits for another T_RCV (§A.4 item 10).
317            Some(DataBlock::More) => {
318                self.awaiting = Some(Duration::ZERO);
319                Out {
320                    writes: vec![self.cmd(tags::RCV, &[])],
321                    ..Out::default()
322                }
323            }
324            // Last (or only) fragment: emit the reassembled SPDU, then let the
325            // appended Status Byte decide whether to receive another message.
326            _ => {
327                let mut out = self.after_status(r.sb_value.data_available());
328                if !self.reassembly.is_empty() {
329                    out.spdus.push(core::mem::take(&mut self.reassembly));
330                }
331                out
332            }
333        }
334    }
335}
336
337#[cfg(test)]
338mod tests {
339    use super::*;
340    use dvb_ci::tpdu::SbValue;
341
342    /// Build an R_TPDU frame (module→host) for tests.
343    fn r_tpdu(tag: u8, tcid: u8, data: &[u8], da: bool) -> Vec<u8> {
344        // tag, length_field(=1+data), tcid, data..., SB(0x80), len=2, tcid, sb_value
345        let mut v = vec![tag];
346        v.push((1 + data.len()) as u8);
347        v.push(tcid);
348        v.extend_from_slice(data);
349        v.extend_from_slice(&[tags::SB, 0x02, tcid, SbValue::new(da).0]);
350        v
351    }
352
353    #[test]
354    fn init_sends_create_tc_and_arms_timeout() {
355        let mut t = Transport::new(1);
356        let out = t.init();
357        assert_eq!(out.writes, vec![vec![tags::CREATE_T_C, 0x01, 0x01]]);
358        assert_eq!(t.state(), TcState::Creating);
359        assert_eq!(out.timer, Some(DEFAULT_REPLY_TIMEOUT));
360    }
361
362    #[test]
363    fn setup_times_out_to_idle() {
364        let mut t = Transport::new(1);
365        t.init();
366        let out = t.tick(DEFAULT_REPLY_TIMEOUT);
367        assert_eq!(out.error, Some(TransportError::SetupTimeout));
368        assert_eq!(t.state(), TcState::Idle);
369    }
370
371    #[test]
372    fn reply_activates_then_polls_on_interval() {
373        let mut t = Transport::new(1);
374        t.init();
375        let out = t.on_frame(&[tags::C_T_C_REPLY, 0x01, 0x01]);
376        assert_eq!(t.state(), TcState::Active);
377        assert!(out.error.is_none());
378        // Before the interval: no poll.
379        let early = t.tick(DEFAULT_POLL_INTERVAL / 2);
380        assert!(early.writes.is_empty());
381        // Crossing the interval: an empty T_Data_Last poll.
382        let due = t.tick(DEFAULT_POLL_INTERVAL);
383        assert_eq!(due.writes, vec![vec![tags::DATA_LAST, 0x01, 0x01]]);
384    }
385
386    #[test]
387    fn reassembles_more_then_last_into_one_spdu() {
388        let mut t = Transport::new(1);
389        t.init();
390        t.on_frame(&[tags::C_T_C_REPLY, 0x01, 0x01]);
391        // MORE: partial data, solicits RCV
392        let o1 = t.on_frame(&r_tpdu(tags::DATA_MORE, 1, &[0xAA, 0xBB], false));
393        assert!(o1.spdus.is_empty());
394        assert_eq!(o1.writes, vec![vec![tags::RCV, 0x01, 0x01]]);
395        // LAST: completes the SPDU
396        let o2 = t.on_frame(&r_tpdu(tags::DATA_LAST, 1, &[0xCC], false));
397        assert_eq!(o2.spdus, vec![vec![0xAA, 0xBB, 0xCC]]);
398    }
399
400    #[test]
401    fn data_available_triggers_rcv() {
402        let mut t = Transport::new(1);
403        t.init();
404        t.on_frame(&[tags::C_T_C_REPLY, 0x01, 0x01]);
405        // LAST with DA set → host must RCV the next queued message.
406        let o = t.on_frame(&r_tpdu(tags::DATA_LAST, 1, &[0x01], true));
407        assert_eq!(o.spdus, vec![vec![0x01]]);
408        assert_eq!(o.writes, vec![vec![tags::RCV, 0x01, 0x01]]);
409    }
410
411    #[test]
412    fn wrong_tcid_is_flagged() {
413        let mut t = Transport::new(1);
414        t.init();
415        let o = t.on_frame(&[tags::C_T_C_REPLY, 0x01, 0x09]);
416        assert_eq!(
417            o.error,
418            Some(TransportError::WrongTcId {
419                got: 9,
420                expected: 1
421            })
422        );
423    }
424}