use alloc::vec::Vec;
use knx_core::address::IndividualAddress;
use knx_core::message::TpduType;
use knx_core::types::Priority;
const CONNECTION_TIMEOUT_MS: u64 = 6000;
const ACK_TIMEOUT_MS: u64 = 3000;
const MAX_REP_COUNT: u8 = 3;
const SEQ_NO_MASK: u8 = 0x0F;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum State {
Closed,
OpenIdle,
OpenWait,
Connecting,
}
#[derive(Debug)]
pub enum Action {
SendControl {
destination: IndividualAddress,
tpdu_type: TpduType,
seq_no: u8,
},
SendDataConnected {
destination: IndividualAddress,
seq_no: u8,
priority: Priority,
apdu: Vec<u8>,
},
ConnectIndication {
source: IndividualAddress,
},
ConnectConfirm {
destination: IndividualAddress,
},
DisconnectIndication {
address: IndividualAddress,
},
DataConnectedIndication {
source: IndividualAddress,
priority: Priority,
apdu: Vec<u8>,
},
DataConnectedConfirm,
}
pub struct TransportLayer {
state: State,
connection_address: IndividualAddress,
seq_no_send: u8,
seq_no_recv: u8,
rep_count: u8,
connection_timeout_deadline: Option<u64>,
ack_timeout_deadline: Option<u64>,
saved_frame: Option<SavedFrame>,
buffered_request: Option<BufferedRequest>,
actions: Vec<Action>,
}
struct SavedFrame {
destination: IndividualAddress,
seq_no: u8,
priority: Priority,
apdu: Vec<u8>,
}
struct BufferedRequest {
priority: Priority,
apdu: Vec<u8>,
}
impl TransportLayer {
pub const fn new() -> Self {
Self {
state: State::Closed,
connection_address: IndividualAddress::from_raw(0),
seq_no_send: 0,
seq_no_recv: 0,
rep_count: 0,
connection_timeout_deadline: None,
ack_timeout_deadline: None,
saved_frame: None,
buffered_request: None,
actions: Vec::new(),
}
}
pub const fn state(&self) -> State {
self.state
}
pub const fn connection_address(&self) -> IndividualAddress {
self.connection_address
}
pub fn take_actions(&mut self) -> Vec<Action> {
core::mem::take(&mut self.actions)
}
pub fn connect_indication(&mut self, source: IndividualAddress, now_ms: u64) {
if self.state == State::Closed {
self.a1_accept_connection(source, now_ms);
} else if source == self.connection_address {
} else {
self.a10_reject_foreign(source);
}
}
pub fn disconnect_indication(&mut self, source: IndividualAddress) {
if source != self.connection_address || self.state == State::Closed {
return; }
self.a5_passive_disconnect(source);
self.state = State::Closed;
}
pub fn data_connected_indication(
&mut self,
source: IndividualAddress,
seq_no: u8,
priority: Priority,
apdu: Vec<u8>,
now_ms: u64,
) {
if source != self.connection_address {
if self.state == State::Connecting {
self.a10_reject_foreign(source);
}
return;
}
let prev_seq = self.seq_no_recv.wrapping_sub(1) & SEQ_NO_MASK;
match self.state {
State::Closed => {}
State::OpenIdle | State::OpenWait => {
if seq_no == self.seq_no_recv {
self.a2_receive_data(source, priority, apdu, now_ms);
} else if seq_no == prev_seq {
self.a3_ack_repeated(source, seq_no, now_ms);
} else {
self.a4_nack_wrong_seq(source, seq_no, now_ms);
}
}
State::Connecting => {
if seq_no == prev_seq {
self.a3_ack_repeated(source, seq_no, now_ms);
} else {
self.active_disconnect();
self.state = State::Closed;
}
}
}
}
pub fn ack_indication(&mut self, source: IndividualAddress, seq_no: u8, now_ms: u64) {
if source != self.connection_address {
if self.state == State::Connecting {
self.a10_reject_foreign(source);
}
return;
}
match self.state {
State::Closed | State::OpenIdle => {} State::OpenWait => {
if seq_no == self.seq_no_send {
self.a8_ack_received(now_ms);
self.state = State::OpenIdle;
} else {
self.active_disconnect();
self.state = State::Closed;
}
}
State::Connecting => {
self.active_disconnect();
self.state = State::Closed;
}
}
}
pub fn nack_indication(&mut self, source: IndividualAddress, seq_no: u8, now_ms: u64) {
if source != self.connection_address {
if self.state == State::Connecting {
self.a10_reject_foreign(source);
}
return;
}
match self.state {
State::Closed => {}
State::OpenIdle | State::Connecting => {
self.active_disconnect();
self.state = State::Closed;
}
State::OpenWait => {
if seq_no != self.seq_no_send {
return;
}
if self.rep_count < MAX_REP_COUNT {
self.a9_retransmit(now_ms);
} else {
self.active_disconnect();
self.state = State::Closed;
}
}
}
}
pub fn connect_confirm(&mut self, success: bool) {
if self.state == State::Connecting {
if success {
self.actions.push(Action::ConnectConfirm {
destination: self.connection_address,
});
self.state = State::OpenIdle;
} else {
self.a5_passive_disconnect(self.connection_address);
self.state = State::Closed;
}
}
}
pub fn data_connected_request(&mut self, priority: Priority, apdu: Vec<u8>, now_ms: u64) {
match self.state {
State::Closed => {} State::OpenIdle => {
self.a7_send_data(priority, apdu, now_ms);
self.state = State::OpenWait;
}
State::OpenWait | State::Connecting => {
self.buffered_request = Some(BufferedRequest { priority, apdu });
}
}
}
pub fn connect_request(&mut self, destination: IndividualAddress, now_ms: u64) {
if self.state == State::Closed {
self.a12_initiate_connection(destination, now_ms);
self.state = State::Connecting;
} else {
self.active_disconnect();
self.state = State::Closed;
}
}
pub fn disconnect_request(&mut self) {
if self.state == State::Closed {
self.actions.push(Action::DisconnectIndication {
address: self.connection_address,
});
} else {
self.active_disconnect();
self.state = State::Closed;
}
}
pub fn poll(&mut self, now_ms: u64) {
if let Some(deadline) = self.connection_timeout_deadline {
if now_ms >= deadline {
self.connection_timeout_deadline = None;
match self.state {
State::OpenIdle | State::OpenWait | State::Connecting => {
self.active_disconnect();
self.state = State::Closed;
}
State::Closed => {}
}
return;
}
}
if let Some(deadline) = self.ack_timeout_deadline {
if now_ms >= deadline && self.state == State::OpenWait {
self.ack_timeout_deadline = None;
if self.rep_count < MAX_REP_COUNT {
self.a9_retransmit(now_ms);
} else {
self.active_disconnect();
self.state = State::Closed;
}
return;
}
}
if self.state == State::OpenIdle {
if let Some(req) = self.buffered_request.take() {
self.a7_send_data(req.priority, req.apdu, now_ms);
self.state = State::OpenWait;
}
}
}
fn a1_accept_connection(&mut self, source: IndividualAddress, now_ms: u64) {
self.connection_address = source;
self.seq_no_send = 0;
self.seq_no_recv = 0;
self.enable_connection_timeout(now_ms);
self.actions.push(Action::ConnectIndication { source });
self.state = State::OpenIdle;
}
fn a2_receive_data(
&mut self,
source: IndividualAddress,
priority: Priority,
apdu: Vec<u8>,
now_ms: u64,
) {
self.actions.push(Action::SendControl {
destination: source,
tpdu_type: TpduType::Ack,
seq_no: self.seq_no_recv,
});
self.seq_no_recv = (self.seq_no_recv + 1) & SEQ_NO_MASK;
self.enable_connection_timeout(now_ms);
self.actions.push(Action::DataConnectedIndication {
source,
priority,
apdu,
});
}
fn a3_ack_repeated(&mut self, source: IndividualAddress, seq_no: u8, now_ms: u64) {
self.actions.push(Action::SendControl {
destination: source,
tpdu_type: TpduType::Ack,
seq_no,
});
self.enable_connection_timeout(now_ms);
}
fn a4_nack_wrong_seq(&mut self, source: IndividualAddress, seq_no: u8, now_ms: u64) {
self.actions.push(Action::SendControl {
destination: source,
tpdu_type: TpduType::Nack,
seq_no,
});
self.enable_connection_timeout(now_ms);
}
fn a5_passive_disconnect(&mut self, address: IndividualAddress) {
self.connection_timeout_deadline = None;
self.ack_timeout_deadline = None;
self.actions.push(Action::DisconnectIndication { address });
}
fn active_disconnect(&mut self) {
self.actions.push(Action::SendControl {
destination: self.connection_address,
tpdu_type: TpduType::Disconnect,
seq_no: 0,
});
self.actions.push(Action::DisconnectIndication {
address: self.connection_address,
});
self.connection_timeout_deadline = None;
self.ack_timeout_deadline = None;
}
fn a7_send_data(&mut self, priority: Priority, apdu: Vec<u8>, now_ms: u64) {
self.saved_frame = Some(SavedFrame {
destination: self.connection_address,
seq_no: self.seq_no_send,
priority,
apdu: apdu.clone(),
});
self.actions.push(Action::SendDataConnected {
destination: self.connection_address,
seq_no: self.seq_no_send,
priority,
apdu,
});
self.rep_count = 0;
self.enable_ack_timeout(now_ms);
self.enable_connection_timeout(now_ms);
}
fn a8_ack_received(&mut self, now_ms: u64) {
self.ack_timeout_deadline = None;
self.seq_no_send = (self.seq_no_send + 1) & SEQ_NO_MASK;
self.saved_frame = None;
self.enable_connection_timeout(now_ms);
self.actions.push(Action::DataConnectedConfirm);
}
fn a9_retransmit(&mut self, now_ms: u64) {
if let Some(ref frame) = self.saved_frame {
self.actions.push(Action::SendDataConnected {
destination: frame.destination,
seq_no: frame.seq_no,
priority: frame.priority,
apdu: frame.apdu.clone(),
});
}
self.rep_count += 1;
self.enable_ack_timeout(now_ms);
self.enable_connection_timeout(now_ms);
}
fn a10_reject_foreign(&mut self, source: IndividualAddress) {
self.actions.push(Action::SendControl {
destination: source,
tpdu_type: TpduType::Disconnect,
seq_no: 0,
});
}
fn a12_initiate_connection(&mut self, destination: IndividualAddress, now_ms: u64) {
self.connection_address = destination;
self.seq_no_send = 0;
self.seq_no_recv = 0;
self.actions.push(Action::SendControl {
destination,
tpdu_type: TpduType::Connect,
seq_no: 0,
});
self.enable_connection_timeout(now_ms);
}
const fn enable_connection_timeout(&mut self, now_ms: u64) {
self.connection_timeout_deadline = Some(now_ms + CONNECTION_TIMEOUT_MS);
}
const fn enable_ack_timeout(&mut self, now_ms: u64) {
self.ack_timeout_deadline = Some(now_ms + ACK_TIMEOUT_MS);
}
}
impl Default for TransportLayer {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use super::*;
const ADDR_1001: IndividualAddress = IndividualAddress::from_raw(0x1001);
const ADDR_2002: IndividualAddress = IndividualAddress::from_raw(0x2002);
#[test]
fn initial_state_is_closed() {
let tl = TransportLayer::new();
assert_eq!(tl.state(), State::Closed);
assert_eq!(tl.connection_address().raw(), 0);
}
#[test]
fn accept_incoming_connection() {
let mut tl = TransportLayer::new();
tl.connect_indication(ADDR_1001, 0);
assert_eq!(tl.state(), State::OpenIdle);
assert_eq!(tl.connection_address(), ADDR_1001);
let actions = tl.take_actions();
assert_eq!(actions.len(), 1);
assert!(matches!(
actions[0],
Action::ConnectIndication { source } if source == ADDR_1001
));
}
#[test]
fn reject_foreign_connection_when_connected() {
let mut tl = TransportLayer::new();
tl.connect_indication(ADDR_1001, 0);
tl.take_actions();
tl.connect_indication(ADDR_2002, 100);
assert_eq!(tl.state(), State::OpenIdle);
let actions = tl.take_actions();
assert_eq!(actions.len(), 1);
assert!(matches!(
actions[0],
Action::SendControl {
destination,
tpdu_type: TpduType::Disconnect,
..
} if destination == ADDR_2002
));
}
#[test]
fn send_and_ack_data() {
let mut tl = TransportLayer::new();
tl.connect_indication(ADDR_1001, 0);
tl.take_actions();
tl.data_connected_request(Priority::Low, alloc::vec![0x01, 0x02], 100);
assert_eq!(tl.state(), State::OpenWait);
let actions = tl.take_actions();
assert_eq!(actions.len(), 1);
assert!(matches!(
actions[0],
Action::SendDataConnected { seq_no: 0, .. }
));
tl.ack_indication(ADDR_1001, 0, 200);
assert_eq!(tl.state(), State::OpenIdle);
let actions = tl.take_actions();
assert!(
actions
.iter()
.any(|a| matches!(a, Action::DataConnectedConfirm))
);
}
#[test]
fn receive_data_sends_ack() {
let mut tl = TransportLayer::new();
tl.connect_indication(ADDR_1001, 0);
tl.take_actions();
tl.data_connected_indication(ADDR_1001, 0, Priority::Low, alloc::vec![0xAA], 100);
let actions = tl.take_actions();
assert!(actions.iter().any(|a| matches!(
a,
Action::SendControl {
tpdu_type: TpduType::Ack,
seq_no: 0,
..
}
)));
assert!(
actions
.iter()
.any(|a| matches!(a, Action::DataConnectedIndication { .. }))
);
}
#[test]
fn nack_triggers_retransmit() {
let mut tl = TransportLayer::new();
tl.connect_indication(ADDR_1001, 0);
tl.take_actions();
tl.data_connected_request(Priority::Low, alloc::vec![0x01], 100);
tl.take_actions();
tl.nack_indication(ADDR_1001, 0, 200);
assert_eq!(tl.state(), State::OpenWait);
let actions = tl.take_actions();
assert_eq!(actions.len(), 1);
assert!(matches!(
actions[0],
Action::SendDataConnected { seq_no: 0, .. }
));
}
#[test]
fn max_retries_disconnects() {
let mut tl = TransportLayer::new();
tl.connect_indication(ADDR_1001, 0);
tl.take_actions();
tl.data_connected_request(Priority::Low, alloc::vec![0x01], 100);
tl.take_actions();
for i in 0..MAX_REP_COUNT {
tl.nack_indication(ADDR_1001, 0, 200 + u64::from(i) * 100);
tl.take_actions();
}
assert_eq!(tl.state(), State::OpenWait);
tl.nack_indication(ADDR_1001, 0, 600);
assert_eq!(tl.state(), State::Closed);
}
#[test]
fn connection_timeout_disconnects() {
let mut tl = TransportLayer::new();
tl.connect_indication(ADDR_1001, 0);
tl.take_actions();
tl.poll(CONNECTION_TIMEOUT_MS + 1);
assert_eq!(tl.state(), State::Closed);
let actions = tl.take_actions();
assert!(actions.iter().any(|a| matches!(
a,
Action::SendControl {
tpdu_type: TpduType::Disconnect,
..
}
)));
}
#[test]
fn ack_timeout_retransmits() {
let mut tl = TransportLayer::new();
tl.connect_indication(ADDR_1001, 0);
tl.take_actions();
tl.data_connected_request(Priority::Low, alloc::vec![0x01], 100);
tl.take_actions();
tl.poll(100 + ACK_TIMEOUT_MS + 1);
assert_eq!(tl.state(), State::OpenWait);
let actions = tl.take_actions();
assert_eq!(actions.len(), 1);
assert!(matches!(actions[0], Action::SendDataConnected { .. }));
}
#[test]
fn buffered_request_sent_after_ack() {
let mut tl = TransportLayer::new();
tl.connect_indication(ADDR_1001, 0);
tl.take_actions();
tl.data_connected_request(Priority::Low, alloc::vec![0x01], 100);
tl.take_actions();
tl.data_connected_request(Priority::Low, alloc::vec![0x02], 150);
let actions = tl.take_actions();
assert!(actions.is_empty());
tl.ack_indication(ADDR_1001, 0, 200);
tl.take_actions();
assert_eq!(tl.state(), State::OpenIdle);
tl.poll(250);
assert_eq!(tl.state(), State::OpenWait);
let actions = tl.take_actions();
assert!(matches!(
actions[0],
Action::SendDataConnected { seq_no: 1, .. }
));
}
#[test]
fn disconnect_from_remote() {
let mut tl = TransportLayer::new();
tl.connect_indication(ADDR_1001, 0);
tl.take_actions();
tl.disconnect_indication(ADDR_1001);
assert_eq!(tl.state(), State::Closed);
let actions = tl.take_actions();
assert!(actions.iter().any(
|a| matches!(a, Action::DisconnectIndication { address } if *address == ADDR_1001)
));
}
#[test]
fn wrong_ack_seq_disconnects() {
let mut tl = TransportLayer::new();
tl.connect_indication(ADDR_1001, 0);
tl.take_actions();
tl.data_connected_request(Priority::Low, alloc::vec![0x01], 100);
tl.take_actions();
tl.ack_indication(ADDR_1001, 5, 200);
assert_eq!(tl.state(), State::Closed);
}
#[test]
fn connect_request_transitions_to_connecting() {
let mut tl = TransportLayer::new();
tl.connect_request(ADDR_1001, 0);
assert_eq!(tl.state(), State::Connecting);
assert_eq!(tl.connection_address(), ADDR_1001);
let actions = tl.take_actions();
assert!(actions.iter().any(|a| matches!(
a,
Action::SendControl {
destination,
tpdu_type: TpduType::Connect,
..
} if *destination == ADDR_1001
)));
}
#[test]
fn disconnect_request_from_open_idle() {
let mut tl = TransportLayer::new();
tl.connect_indication(ADDR_1001, 0);
tl.take_actions();
assert_eq!(tl.state(), State::OpenIdle);
tl.disconnect_request();
assert_eq!(tl.state(), State::Closed);
let actions = tl.take_actions();
assert!(actions.iter().any(|a| matches!(
a,
Action::SendControl {
tpdu_type: TpduType::Disconnect,
..
}
)));
}
#[test]
fn data_in_connecting_state_is_buffered() {
let mut tl = TransportLayer::new();
tl.connect_request(ADDR_1001, 0);
tl.take_actions();
assert_eq!(tl.state(), State::Connecting);
tl.data_connected_request(Priority::Low, alloc::vec![0xAB], 100);
let actions = tl.take_actions();
assert!(actions.is_empty());
tl.connect_confirm(true);
tl.take_actions();
assert_eq!(tl.state(), State::OpenIdle);
tl.poll(200);
assert_eq!(tl.state(), State::OpenWait);
let actions = tl.take_actions();
assert!(matches!(
actions[0],
Action::SendDataConnected { seq_no: 0, .. }
));
}
#[test]
fn sequence_numbers_wrap() {
let mut tl = TransportLayer::new();
tl.connect_indication(ADDR_1001, 0);
tl.take_actions();
for i in 0u8..16 {
tl.data_connected_request(Priority::Low, alloc::vec![i], u64::from(i) * 100);
tl.take_actions();
tl.ack_indication(ADDR_1001, i & SEQ_NO_MASK, u64::from(i) * 100 + 50);
tl.take_actions();
}
tl.data_connected_request(Priority::Low, alloc::vec![0xFF], 2000);
let actions = tl.take_actions();
assert!(matches!(
actions[0],
Action::SendDataConnected { seq_no: 0, .. }
));
}
#[test]
fn connect_confirm_failure_returns_to_closed() {
let mut tl = TransportLayer::new();
tl.connect_request(ADDR_1001, 0);
assert_eq!(tl.state(), State::Connecting);
tl.take_actions();
tl.connect_confirm(false);
assert_eq!(tl.state(), State::Closed);
let actions = tl.take_actions();
assert!(actions.iter().any(
|a| matches!(a, Action::DisconnectIndication { address } if *address == ADDR_1001)
));
}
#[test]
fn foreign_data_in_open_idle_triggers_disconnect() {
let mut tl = TransportLayer::new();
tl.connect_indication(ADDR_1001, 0);
tl.take_actions();
assert_eq!(tl.state(), State::OpenIdle);
tl.data_connected_indication(ADDR_2002, 0, Priority::Low, alloc::vec![0xAA], 100);
let actions = tl.take_actions();
assert!(actions.is_empty());
assert_eq!(tl.state(), State::OpenIdle);
}
}