dvb-ci-runtime 0.6.0

Pure-Rust EN 50221 DVB Common Interface driver runtime — device I/O, TPDU/SPDU poll loop, and resource state machines over the dvb-ci codecs.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
//! TPDU transport layer — a sans-IO state machine for the single transport
//! connection per CI slot (ETSI EN 50221 §A.4).
//!
//! Connection lifecycle (Figures 6/7): `Idle → Creating → Active`. The host
//! sends `Create_T_C`; the module answers `C_T_C_Reply` (→ `Active`) or the
//! host times out back to `Idle`. In `Active` the host **polls regularly** —
//! per §A.4, a poll is a `T_Data_Last` with an empty data field — and, whenever
//! a Status Byte reports Data-Available (DA), sends `T_RCV` to receive the
//! queued message. Chained module messages arrive as `T_Data_More*` then a
//! final `T_Data_Last` and are reassembled into one SPDU payload.
//!
//! Timing: EN 50221 mandates regular polling and a reply-timeout arc but does
//! not fix the interval, so [`DEFAULT_POLL_INTERVAL`] / [`DEFAULT_REPLY_TIMEOUT`]
//! are implementation-chosen defaults. All timing is expressed via the sans-IO
//! [`Tick`](crate::event::Event::Tick)/timer model so it is deterministic and
//! testable without a clock.

use std::collections::VecDeque;
use std::time::Duration;

use dvb_ci::tpdu::{create_t_c, tags, CommandTpdu, DataBlock, ResponseTpdu, SbValue, TcObject};
use dvb_common::{Parse, Serialize};

/// Length of a standalone/appended `T_SB` object: `tag · 0x02 · t_c_id · SB`.
const SB_OBJECT_LEN: usize = 4;

/// Parse a `T_SB` object (`0x80 0x02 t_c_id sb_value`) at the start of `bytes`.
fn parse_sb(bytes: &[u8]) -> Option<(u8, SbValue)> {
    if bytes.len() >= SB_OBJECT_LEN && bytes[0] == tags::SB && bytes[1] == 0x02 {
        Some((bytes[2], SbValue(bytes[3])))
    } else {
        None
    }
}

/// Conventional host poll interval (implementation-chosen; §A.4 mandates only
/// "poll regularly").
pub const DEFAULT_POLL_INTERVAL: Duration = Duration::from_millis(100);
/// Conventional reply timeout for an expected `R_TPDU` (the §A.4 Figure 6/7
/// "Timeout" arc; value implementation-chosen).
pub const DEFAULT_REPLY_TIMEOUT: Duration = Duration::from_millis(1000);

/// Transport connection state (EN 50221 §A.4, Figures 6/7).
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TcState {
    /// No connection; nothing sent.
    Idle,
    /// `Create_T_C` sent, awaiting `C_T_C_Reply`.
    Creating,
    /// Connection up; polling/exchanging data.
    Active,
}

/// What the transport layer wants done after handling an input.
#[derive(Debug, Default, Clone, PartialEq, Eq)]
pub struct Out {
    /// Link-layer TPDU frames to write to the device, in order.
    pub writes: Vec<Vec<u8>>,
    /// Fully-reassembled SPDU payloads to pass up to the session layer.
    pub spdus: Vec<Vec<u8>>,
    /// Requested delay until the next [`Tick`](crate::event::Event::Tick).
    pub timer: Option<Duration>,
    /// A transport error (e.g. reply timeout, unexpected tag).
    pub error: Option<TransportError>,
}

/// Transport-layer errors.
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
#[non_exhaustive]
pub enum TransportError {
    /// No `C_T_C_Reply` within the reply timeout (§A.4 timeout arc).
    #[error("transport connection setup timed out")]
    SetupTimeout,
    /// A reply arrived for a different `t_c_id` than ours.
    #[error("unexpected t_c_id {got} (expected {expected})")]
    WrongTcId {
        /// The `t_c_id` received.
        got: u8,
        /// Our connection's `t_c_id`.
        expected: u8,
    },
    /// The module reported a `T_C_Error`.
    #[error("module reported T_C_Error")]
    ModuleError,
    /// A frame could not be parsed as an `R_TPDU`.
    #[error("malformed R_TPDU")]
    Malformed,
}

/// The single transport connection for a slot.
#[derive(Debug)]
pub struct Transport {
    tcid: u8,
    state: TcState,
    reassembly: Vec<u8>,
    poll_interval: Duration,
    reply_timeout: Duration,
    /// Time accumulated since the last poll (drives the poll cadence).
    since_poll: Duration,
    /// Time accumulated since a command that expects a reply (drives the
    /// reply timeout); `None` when not awaiting a reply. While `Some`, a host
    /// C_TPDU is *in flight* (sent, module not yet answered) — the link is
    /// half-duplex, so no further data block may be sent until it clears.
    awaiting: Option<Duration>,
    /// SPDUs queued to send, one `T_Data_Last` per module turn. EN 50221's link
    /// is polled half-duplex: the host sends a single data block, then must wait
    /// for the module's `T_SB` before sending the next. Sending two back-to-back
    /// makes a real CAM drop the second (issue #337).
    outbound: VecDeque<Vec<u8>>,
}

impl Default for Transport {
    fn default() -> Self {
        Self::new(1)
    }
}

impl Transport {
    /// New transport for `tcid` with default timings.
    #[must_use]
    pub fn new(tcid: u8) -> Self {
        Self {
            tcid,
            state: TcState::Idle,
            reassembly: Vec::new(),
            poll_interval: DEFAULT_POLL_INTERVAL,
            reply_timeout: DEFAULT_REPLY_TIMEOUT,
            since_poll: Duration::ZERO,
            awaiting: None,
            outbound: VecDeque::new(),
        }
    }

    /// Override the poll interval / reply timeout.
    #[must_use]
    pub fn with_timing(mut self, poll: Duration, reply: Duration) -> Self {
        self.poll_interval = poll;
        self.reply_timeout = reply;
        self
    }

    /// Current connection state.
    #[must_use]
    pub fn state(&self) -> TcState {
        self.state
    }

    fn cmd(&self, tag: u8, data: &[u8]) -> Vec<u8> {
        let c = CommandTpdu {
            tag,
            t_c_id: self.tcid,
            data,
        };
        let mut buf = vec![0u8; c.serialized_len()];
        // serialize_into only fails on a too-small buffer; ours is exact.
        let n = c.serialize_into(&mut buf).expect("exact buffer");
        buf.truncate(n);
        buf
    }

    fn poll_frame(&self) -> Vec<u8> {
        // §A.4: poll == T_Data_Last with empty data.
        self.cmd(tags::DATA_LAST, &[])
    }

    /// Open the connection: emit `Create_T_C` and arm the reply timeout.
    pub fn init(&mut self) -> Out {
        self.state = TcState::Creating;
        self.awaiting = Some(Duration::ZERO);
        let obj: TcObject = create_t_c(self.tcid);
        Out {
            writes: vec![obj.to_bytes()],
            timer: Some(self.reply_timeout),
            ..Out::default()
        }
    }

    /// Queue an upper-layer SPDU to send (wrapped in a `T_Data_Last`). The block
    /// is transmitted now if the link is free, else held until the in-flight
    /// C_TPDU is answered — one data block per module turn (§A.4 half-duplex).
    pub fn send_spdu(&mut self, spdu: &[u8]) -> Out {
        if self.state != TcState::Active {
            return Out::default();
        }
        self.outbound.push_back(spdu.to_vec());
        self.flush()
    }

    /// Emit the next queued data block if the link is free (Active and no
    /// C_TPDU in flight); otherwise nothing (it waits for the module's `T_SB`).
    fn flush(&mut self) -> Out {
        if self.state != TcState::Active || self.awaiting.is_some() {
            return Out::default();
        }
        match self.outbound.pop_front() {
            Some(spdu) => {
                self.awaiting = Some(Duration::ZERO);
                self.since_poll = Duration::ZERO;
                Out {
                    writes: vec![self.cmd(tags::DATA_LAST, &spdu)],
                    timer: Some(self.poll_interval),
                    ..Out::default()
                }
            }
            None => Out::default(),
        }
    }

    /// Advance logical time by `elapsed`: poll if due, or time out a pending
    /// reply.
    pub fn tick(&mut self, elapsed: Duration) -> Out {
        match self.state {
            TcState::Idle => Out::default(),
            TcState::Creating => {
                if let Some(w) = self.awaiting.as_mut() {
                    *w += elapsed;
                    if *w >= self.reply_timeout {
                        self.state = TcState::Idle;
                        self.awaiting = None;
                        return Out {
                            error: Some(TransportError::SetupTimeout),
                            ..Out::default()
                        };
                    }
                }
                Out {
                    timer: Some(self.reply_timeout),
                    ..Out::default()
                }
            }
            TcState::Active => {
                self.since_poll += elapsed;
                if self.since_poll >= self.poll_interval {
                    self.since_poll = Duration::ZERO;
                    // A queued data block goes out in preference to an empty
                    // poll, but only when no C_TPDU is in flight.
                    if self.awaiting.is_none() && !self.outbound.is_empty() {
                        return self.flush();
                    }
                    self.awaiting = Some(Duration::ZERO);
                    Out {
                        writes: vec![self.poll_frame()],
                        timer: Some(self.poll_interval),
                        ..Out::default()
                    }
                } else {
                    Out {
                        timer: Some(self.poll_interval - self.since_poll),
                        ..Out::default()
                    }
                }
            }
        }
    }

    /// Handle one link-layer frame read from the device.
    ///
    /// A module frame is a leading object (`C_T_C_Reply` / `T_Data_*` / …)
    /// followed by an appended `T_SB`, or a standalone `T_SB` (the reply to a
    /// poll with nothing queued). The `T_SB`'s DA bit drives whether the host
    /// must `T_RCV` next.
    pub fn on_frame(&mut self, frame: &[u8]) -> Out {
        self.awaiting = None;
        match frame.first().copied() {
            // C_T_C_Reply (+ appended T_SB): connection becomes Active.
            Some(tags::C_T_C_REPLY) => match TcObject::parse(frame) {
                Ok(o) if o.t_c_id == self.tcid => {
                    self.state = TcState::Active;
                    self.since_poll = Duration::ZERO;
                    let da = parse_sb(&frame[3..]).is_some_and(|(_, sb)| sb.data_available());
                    self.after_status(da)
                }
                Ok(o) => self.wrong_tcid(o.t_c_id),
                Err(_) => self.malformed(),
            },
            // Standalone T_SB — the reply to a poll.
            Some(tags::SB) => match parse_sb(frame) {
                Some((tcid, _)) if tcid != self.tcid => self.wrong_tcid(tcid),
                Some((_, sb)) => self.after_status(sb.data_available()),
                None => self.malformed(),
            },
            Some(tags::T_C_ERROR) => Out {
                error: Some(TransportError::ModuleError),
                ..Out::default()
            },
            Some(tags::DATA_LAST | tags::DATA_MORE) => self.on_data(frame),
            _ => self.malformed(),
        }
    }

    fn malformed(&self) -> Out {
        Out {
            error: Some(TransportError::Malformed),
            ..Out::default()
        }
    }

    fn wrong_tcid(&self, got: u8) -> Out {
        Out {
            error: Some(TransportError::WrongTcId {
                got,
                expected: self.tcid,
            }),
            ..Out::default()
        }
    }

    /// React to a Status Byte: if DA, solicit the queued message with `T_RCV`;
    /// otherwise resume the idle poll cadence.
    fn after_status(&mut self, data_available: bool) -> Out {
        if data_available {
            self.awaiting = Some(Duration::ZERO);
            Out {
                writes: vec![self.cmd(tags::RCV, &[])],
                ..Out::default()
            }
        } else {
            // Module idle: its `T_SB` freed the link, so send the next queued
            // data block if any (the #337 fix); otherwise resume polling.
            if !self.outbound.is_empty() {
                return self.flush();
            }
            self.since_poll = Duration::ZERO;
            Out {
                timer: Some(self.poll_interval),
                ..Out::default()
            }
        }
    }

    fn on_data(&mut self, frame: &[u8]) -> Out {
        let r = match ResponseTpdu::parse(frame) {
            Ok(r) => r,
            Err(_) => {
                return Out {
                    error: Some(TransportError::Malformed),
                    ..Out::default()
                }
            }
        };
        if r.t_c_id != self.tcid {
            return Out {
                error: Some(TransportError::WrongTcId {
                    got: r.t_c_id,
                    expected: self.tcid,
                }),
                ..Out::default()
            };
        }
        self.reassembly.extend_from_slice(r.data);
        match r.block {
            // More chained fragments: each waits for another T_RCV (§A.4 item 10).
            Some(DataBlock::More) => {
                self.awaiting = Some(Duration::ZERO);
                Out {
                    writes: vec![self.cmd(tags::RCV, &[])],
                    ..Out::default()
                }
            }
            // Last (or only) fragment: emit the reassembled SPDU, then let the
            // appended Status Byte decide whether to receive another message.
            _ => {
                let mut out = self.after_status(r.sb_value.data_available());
                if !self.reassembly.is_empty() {
                    out.spdus.push(core::mem::take(&mut self.reassembly));
                }
                out
            }
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use dvb_ci::tpdu::SbValue;

    /// Build an R_TPDU frame (module→host) for tests.
    fn r_tpdu(tag: u8, tcid: u8, data: &[u8], da: bool) -> Vec<u8> {
        // tag, length_field(=1+data), tcid, data..., SB(0x80), len=2, tcid, sb_value
        let mut v = vec![tag];
        v.push((1 + data.len()) as u8);
        v.push(tcid);
        v.extend_from_slice(data);
        v.extend_from_slice(&[tags::SB, 0x02, tcid, SbValue::new(da).0]);
        v
    }

    #[test]
    fn init_sends_create_tc_and_arms_timeout() {
        let mut t = Transport::new(1);
        let out = t.init();
        assert_eq!(out.writes, vec![vec![tags::CREATE_T_C, 0x01, 0x01]]);
        assert_eq!(t.state(), TcState::Creating);
        assert_eq!(out.timer, Some(DEFAULT_REPLY_TIMEOUT));
    }

    #[test]
    fn setup_times_out_to_idle() {
        let mut t = Transport::new(1);
        t.init();
        let out = t.tick(DEFAULT_REPLY_TIMEOUT);
        assert_eq!(out.error, Some(TransportError::SetupTimeout));
        assert_eq!(t.state(), TcState::Idle);
    }

    #[test]
    fn reply_activates_then_polls_on_interval() {
        let mut t = Transport::new(1);
        t.init();
        let out = t.on_frame(&[tags::C_T_C_REPLY, 0x01, 0x01]);
        assert_eq!(t.state(), TcState::Active);
        assert!(out.error.is_none());
        // Before the interval: no poll.
        let early = t.tick(DEFAULT_POLL_INTERVAL / 2);
        assert!(early.writes.is_empty());
        // Crossing the interval: an empty T_Data_Last poll.
        let due = t.tick(DEFAULT_POLL_INTERVAL);
        assert_eq!(due.writes, vec![vec![tags::DATA_LAST, 0x01, 0x01]]);
    }

    #[test]
    fn reassembles_more_then_last_into_one_spdu() {
        let mut t = Transport::new(1);
        t.init();
        t.on_frame(&[tags::C_T_C_REPLY, 0x01, 0x01]);
        // MORE: partial data, solicits RCV
        let o1 = t.on_frame(&r_tpdu(tags::DATA_MORE, 1, &[0xAA, 0xBB], false));
        assert!(o1.spdus.is_empty());
        assert_eq!(o1.writes, vec![vec![tags::RCV, 0x01, 0x01]]);
        // LAST: completes the SPDU
        let o2 = t.on_frame(&r_tpdu(tags::DATA_LAST, 1, &[0xCC], false));
        assert_eq!(o2.spdus, vec![vec![0xAA, 0xBB, 0xCC]]);
    }

    #[test]
    fn data_available_triggers_rcv() {
        let mut t = Transport::new(1);
        t.init();
        t.on_frame(&[tags::C_T_C_REPLY, 0x01, 0x01]);
        // LAST with DA set → host must RCV the next queued message.
        let o = t.on_frame(&r_tpdu(tags::DATA_LAST, 1, &[0x01], true));
        assert_eq!(o.spdus, vec![vec![0x01]]);
        assert_eq!(o.writes, vec![vec![tags::RCV, 0x01, 0x01]]);
    }

    #[test]
    fn two_sends_serialize_one_block_per_module_turn() {
        // #337: a real CAM drops a second T_Data_Last sent before it answers the
        // first. Two send_spdu in one turn must emit only ONE write; the second
        // goes out after the module's T_SB.
        let mut t = Transport::new(1);
        t.init();
        t.on_frame(&[tags::C_T_C_REPLY, 0x01, 0x01]);

        let first = t.send_spdu(&[0x92, 0x07]); // e.g. open_session_response
        assert_eq!(first.writes.len(), 1);
        assert_eq!(first.writes[0][0], tags::DATA_LAST);

        // Queued while the first is in flight → no write yet.
        let second = t.send_spdu(&[0x9F, 0x80, 0x10, 0x00]); // profile_enq
        assert!(
            second.writes.is_empty(),
            "second block must wait for the SB"
        );

        // Module acknowledges with a standalone T_SB (data_available = 0).
        let after_sb = t.on_frame(&[tags::SB, 0x02, 0x01, SbValue::new(false).0]);
        assert_eq!(
            after_sb.writes.len(),
            1,
            "second block flushes after the SB"
        );
        assert_eq!(after_sb.writes[0][0], tags::DATA_LAST);
        // It carries the profile_enq payload.
        assert!(after_sb.writes[0]
            .windows(4)
            .any(|w| w == [0x9F, 0x80, 0x10, 0x00]));
    }

    #[test]
    fn wrong_tcid_is_flagged() {
        let mut t = Transport::new(1);
        t.init();
        let o = t.on_frame(&[tags::C_T_C_REPLY, 0x01, 0x09]);
        assert_eq!(
            o.error,
            Some(TransportError::WrongTcId {
                got: 9,
                expected: 1
            })
        );
    }
}