use std::collections::VecDeque;
use std::time::Duration;
use dvb_ci::tpdu::{create_t_c, tags, CommandTpdu, DataBlock, ResponseTpdu, SbValue, TcObject};
use dvb_common::{Parse, Serialize};
const SB_OBJECT_LEN: usize = 4;
fn parse_sb(bytes: &[u8]) -> Option<(u8, SbValue)> {
if bytes.len() >= SB_OBJECT_LEN && bytes[0] == tags::SB && bytes[1] == 0x02 {
Some((bytes[2], SbValue(bytes[3])))
} else {
None
}
}
pub const DEFAULT_POLL_INTERVAL: Duration = Duration::from_millis(100);
pub const DEFAULT_REPLY_TIMEOUT: Duration = Duration::from_millis(1000);
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TcState {
Idle,
Creating,
Active,
}
#[derive(Debug, Default, Clone, PartialEq, Eq)]
pub struct Out {
pub writes: Vec<Vec<u8>>,
pub spdus: Vec<Vec<u8>>,
pub timer: Option<Duration>,
pub error: Option<TransportError>,
}
#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
#[non_exhaustive]
pub enum TransportError {
#[error("transport connection setup timed out")]
SetupTimeout,
#[error("unexpected t_c_id {got} (expected {expected})")]
WrongTcId {
got: u8,
expected: u8,
},
#[error("module reported T_C_Error")]
ModuleError,
#[error("malformed R_TPDU")]
Malformed,
}
#[derive(Debug)]
pub struct Transport {
tcid: u8,
state: TcState,
reassembly: Vec<u8>,
poll_interval: Duration,
reply_timeout: Duration,
since_poll: Duration,
awaiting: Option<Duration>,
outbound: VecDeque<Vec<u8>>,
}
impl Default for Transport {
fn default() -> Self {
Self::new(1)
}
}
impl Transport {
#[must_use]
pub fn new(tcid: u8) -> Self {
Self {
tcid,
state: TcState::Idle,
reassembly: Vec::new(),
poll_interval: DEFAULT_POLL_INTERVAL,
reply_timeout: DEFAULT_REPLY_TIMEOUT,
since_poll: Duration::ZERO,
awaiting: None,
outbound: VecDeque::new(),
}
}
#[must_use]
pub fn with_timing(mut self, poll: Duration, reply: Duration) -> Self {
self.poll_interval = poll;
self.reply_timeout = reply;
self
}
#[must_use]
pub fn state(&self) -> TcState {
self.state
}
fn cmd(&self, tag: u8, data: &[u8]) -> Vec<u8> {
let c = CommandTpdu {
tag,
t_c_id: self.tcid,
data,
};
let mut buf = vec![0u8; c.serialized_len()];
let n = c.serialize_into(&mut buf).expect("exact buffer");
buf.truncate(n);
buf
}
fn poll_frame(&self) -> Vec<u8> {
self.cmd(tags::DATA_LAST, &[])
}
pub fn init(&mut self) -> Out {
self.state = TcState::Creating;
self.awaiting = Some(Duration::ZERO);
let obj: TcObject = create_t_c(self.tcid);
Out {
writes: vec![obj.to_bytes()],
timer: Some(self.reply_timeout),
..Out::default()
}
}
pub fn send_spdu(&mut self, spdu: &[u8]) -> Out {
if self.state != TcState::Active {
return Out::default();
}
self.outbound.push_back(spdu.to_vec());
self.flush()
}
fn flush(&mut self) -> Out {
if self.state != TcState::Active || self.awaiting.is_some() {
return Out::default();
}
match self.outbound.pop_front() {
Some(spdu) => {
self.awaiting = Some(Duration::ZERO);
self.since_poll = Duration::ZERO;
Out {
writes: vec![self.cmd(tags::DATA_LAST, &spdu)],
timer: Some(self.poll_interval),
..Out::default()
}
}
None => Out::default(),
}
}
pub fn tick(&mut self, elapsed: Duration) -> Out {
match self.state {
TcState::Idle => Out::default(),
TcState::Creating => {
if let Some(w) = self.awaiting.as_mut() {
*w += elapsed;
if *w >= self.reply_timeout {
self.state = TcState::Idle;
self.awaiting = None;
return Out {
error: Some(TransportError::SetupTimeout),
..Out::default()
};
}
}
Out {
timer: Some(self.reply_timeout),
..Out::default()
}
}
TcState::Active => {
self.since_poll += elapsed;
if self.since_poll >= self.poll_interval {
self.since_poll = Duration::ZERO;
if self.awaiting.is_none() && !self.outbound.is_empty() {
return self.flush();
}
self.awaiting = Some(Duration::ZERO);
Out {
writes: vec![self.poll_frame()],
timer: Some(self.poll_interval),
..Out::default()
}
} else {
Out {
timer: Some(self.poll_interval - self.since_poll),
..Out::default()
}
}
}
}
}
pub fn on_frame(&mut self, frame: &[u8]) -> Out {
self.awaiting = None;
match frame.first().copied() {
Some(tags::C_T_C_REPLY) => match TcObject::parse(frame) {
Ok(o) if o.t_c_id == self.tcid => {
self.state = TcState::Active;
self.since_poll = Duration::ZERO;
let da = parse_sb(&frame[3..]).is_some_and(|(_, sb)| sb.data_available());
self.after_status(da)
}
Ok(o) => self.wrong_tcid(o.t_c_id),
Err(_) => self.malformed(),
},
Some(tags::SB) => match parse_sb(frame) {
Some((tcid, _)) if tcid != self.tcid => self.wrong_tcid(tcid),
Some((_, sb)) => self.after_status(sb.data_available()),
None => self.malformed(),
},
Some(tags::T_C_ERROR) => Out {
error: Some(TransportError::ModuleError),
..Out::default()
},
Some(tags::DATA_LAST | tags::DATA_MORE) => self.on_data(frame),
_ => self.malformed(),
}
}
fn malformed(&self) -> Out {
Out {
error: Some(TransportError::Malformed),
..Out::default()
}
}
fn wrong_tcid(&self, got: u8) -> Out {
Out {
error: Some(TransportError::WrongTcId {
got,
expected: self.tcid,
}),
..Out::default()
}
}
fn after_status(&mut self, data_available: bool) -> Out {
if data_available {
self.awaiting = Some(Duration::ZERO);
Out {
writes: vec![self.cmd(tags::RCV, &[])],
..Out::default()
}
} else {
if !self.outbound.is_empty() {
return self.flush();
}
self.since_poll = Duration::ZERO;
Out {
timer: Some(self.poll_interval),
..Out::default()
}
}
}
fn on_data(&mut self, frame: &[u8]) -> Out {
let r = match ResponseTpdu::parse(frame) {
Ok(r) => r,
Err(_) => {
return Out {
error: Some(TransportError::Malformed),
..Out::default()
}
}
};
if r.t_c_id != self.tcid {
return Out {
error: Some(TransportError::WrongTcId {
got: r.t_c_id,
expected: self.tcid,
}),
..Out::default()
};
}
self.reassembly.extend_from_slice(r.data);
match r.block {
Some(DataBlock::More) => {
self.awaiting = Some(Duration::ZERO);
Out {
writes: vec![self.cmd(tags::RCV, &[])],
..Out::default()
}
}
_ => {
let mut out = self.after_status(r.sb_value.data_available());
if !self.reassembly.is_empty() {
out.spdus.push(core::mem::take(&mut self.reassembly));
}
out
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use dvb_ci::tpdu::SbValue;
fn r_tpdu(tag: u8, tcid: u8, data: &[u8], da: bool) -> Vec<u8> {
let mut v = vec![tag];
v.push((1 + data.len()) as u8);
v.push(tcid);
v.extend_from_slice(data);
v.extend_from_slice(&[tags::SB, 0x02, tcid, SbValue::new(da).0]);
v
}
#[test]
fn init_sends_create_tc_and_arms_timeout() {
let mut t = Transport::new(1);
let out = t.init();
assert_eq!(out.writes, vec![vec![tags::CREATE_T_C, 0x01, 0x01]]);
assert_eq!(t.state(), TcState::Creating);
assert_eq!(out.timer, Some(DEFAULT_REPLY_TIMEOUT));
}
#[test]
fn setup_times_out_to_idle() {
let mut t = Transport::new(1);
t.init();
let out = t.tick(DEFAULT_REPLY_TIMEOUT);
assert_eq!(out.error, Some(TransportError::SetupTimeout));
assert_eq!(t.state(), TcState::Idle);
}
#[test]
fn reply_activates_then_polls_on_interval() {
let mut t = Transport::new(1);
t.init();
let out = t.on_frame(&[tags::C_T_C_REPLY, 0x01, 0x01]);
assert_eq!(t.state(), TcState::Active);
assert!(out.error.is_none());
let early = t.tick(DEFAULT_POLL_INTERVAL / 2);
assert!(early.writes.is_empty());
let due = t.tick(DEFAULT_POLL_INTERVAL);
assert_eq!(due.writes, vec![vec![tags::DATA_LAST, 0x01, 0x01]]);
}
#[test]
fn reassembles_more_then_last_into_one_spdu() {
let mut t = Transport::new(1);
t.init();
t.on_frame(&[tags::C_T_C_REPLY, 0x01, 0x01]);
let o1 = t.on_frame(&r_tpdu(tags::DATA_MORE, 1, &[0xAA, 0xBB], false));
assert!(o1.spdus.is_empty());
assert_eq!(o1.writes, vec![vec![tags::RCV, 0x01, 0x01]]);
let o2 = t.on_frame(&r_tpdu(tags::DATA_LAST, 1, &[0xCC], false));
assert_eq!(o2.spdus, vec![vec![0xAA, 0xBB, 0xCC]]);
}
#[test]
fn data_available_triggers_rcv() {
let mut t = Transport::new(1);
t.init();
t.on_frame(&[tags::C_T_C_REPLY, 0x01, 0x01]);
let o = t.on_frame(&r_tpdu(tags::DATA_LAST, 1, &[0x01], true));
assert_eq!(o.spdus, vec![vec![0x01]]);
assert_eq!(o.writes, vec![vec![tags::RCV, 0x01, 0x01]]);
}
#[test]
fn two_sends_serialize_one_block_per_module_turn() {
let mut t = Transport::new(1);
t.init();
t.on_frame(&[tags::C_T_C_REPLY, 0x01, 0x01]);
let first = t.send_spdu(&[0x92, 0x07]); assert_eq!(first.writes.len(), 1);
assert_eq!(first.writes[0][0], tags::DATA_LAST);
let second = t.send_spdu(&[0x9F, 0x80, 0x10, 0x00]); assert!(
second.writes.is_empty(),
"second block must wait for the SB"
);
let after_sb = t.on_frame(&[tags::SB, 0x02, 0x01, SbValue::new(false).0]);
assert_eq!(
after_sb.writes.len(),
1,
"second block flushes after the SB"
);
assert_eq!(after_sb.writes[0][0], tags::DATA_LAST);
assert!(after_sb.writes[0]
.windows(4)
.any(|w| w == [0x9F, 0x80, 0x10, 0x00]));
}
#[test]
fn wrong_tcid_is_flagged() {
let mut t = Transport::new(1);
t.init();
let o = t.on_frame(&[tags::C_T_C_REPLY, 0x01, 0x09]);
assert_eq!(
o.error,
Some(TransportError::WrongTcId {
got: 9,
expected: 1
})
);
}
}