use crate::clock::Clock;
use crate::command::Command;
use crate::error::Error;
use crate::packet::{Address, ControlByte, CtrlFlags, PacketBuilder, ParsedPacket, Sqn};
use crate::reply::{Reply, ReplyCode};
use crate::transport::Transport;
use alloc::vec::Vec;
#[derive(Debug, Clone)]
pub struct PdState {
pub next_sqn: Sqn,
pub use_crc: bool,
pub last_seen_ms: u64,
seen_at_least_once: bool,
}
impl Default for PdState {
fn default() -> Self {
Self {
next_sqn: Sqn::ZERO,
use_crc: true,
last_seen_ms: 0,
seen_at_least_once: false,
}
}
}
impl PdState {
pub fn bump_sqn(&mut self) {
self.next_sqn = self.next_sqn.next();
}
pub fn mark_seen(&mut self, now_ms: u64) {
self.last_seen_ms = now_ms;
self.seen_at_least_once = true;
}
pub fn is_offline(&self, now_ms: u64) -> bool {
self.seen_at_least_once
&& now_ms.saturating_sub(self.last_seen_ms) >= crate::OFFLINE_THRESHOLD_MS as u64
}
pub fn reset(&mut self) {
*self = Self::default();
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
#[non_exhaustive]
pub enum ExchangeOutcome {
Reply(Reply),
Busy,
Timeout,
Offline,
}
#[derive(Debug, Clone, Copy)]
pub struct RetryConfig {
pub max_retries: u8,
pub overall_budget_ms: u32,
}
impl Default for RetryConfig {
fn default() -> Self {
Self {
max_retries: 2,
overall_budget_ms: 0,
}
}
}
pub struct Acu<T: Transport, C: Clock> {
transport: T,
clock: C,
pub reply_delay_ms: u32,
pub retry: RetryConfig,
rx_buf: Vec<u8>,
}
impl<T: Transport, C: Clock> Acu<T, C> {
pub fn new(transport: T, clock: C) -> Self {
Self {
transport,
clock,
reply_delay_ms: crate::REPLY_DELAY_MS,
retry: RetryConfig::default(),
rx_buf: Vec::with_capacity(crate::MAX_BUS_PACKET),
}
}
pub fn transport(&mut self) -> &mut T {
&mut self.transport
}
pub fn clock(&self) -> &C {
&self.clock
}
pub fn send_to(
&mut self,
pd_addr: u8,
pd: &mut PdState,
command: &Command,
) -> Result<Vec<u8>, Error> {
self.send_with_sqn(pd_addr, pd.use_crc, pd.next_sqn, command)
}
fn send_with_sqn(
&mut self,
pd_addr: u8,
use_crc: bool,
sqn: Sqn,
command: &Command,
) -> Result<Vec<u8>, Error> {
let addr = Address::pd(pd_addr)?;
let mut flags = CtrlFlags::empty();
if use_crc {
flags |= CtrlFlags::USE_CRC;
}
let ctrl = ControlByte::new(sqn, flags);
let data = command.encode_data()?;
let bytes = PacketBuilder::plain(addr, ctrl, command.code().as_byte(), data).encode()?;
self.transport.write_all(&bytes)?;
Ok(bytes)
}
pub fn receive(&mut self, pd: &mut PdState) -> Result<Reply, Error> {
let start = self.clock.now_ms();
let mut empty_reads = 0u8;
loop {
if let Some((reply_code, data)) = self.try_parse_packet()? {
let now = self.clock.now_ms();
pd.mark_seen(now);
pd.bump_sqn();
return Reply::decode(reply_code, &data);
}
let mut tmp = [0u8; 64];
let n = self.transport.read(&mut tmp)?;
if n > 0 {
self.rx_buf.extend_from_slice(&tmp[..n]);
empty_reads = 0;
continue;
}
if self.clock.now_ms().saturating_sub(start) >= self.reply_delay_ms as u64 {
return Err(Error::Timeout);
}
empty_reads = empty_reads.saturating_add(1);
if empty_reads >= 4 {
return Err(Error::Timeout);
}
}
}
pub fn exchange(
&mut self,
pd_addr: u8,
pd: &mut PdState,
command: &Command,
) -> Result<ExchangeOutcome, Error> {
let now = self.clock.now_ms();
if pd.is_offline(now) {
return Ok(ExchangeOutcome::Offline);
}
let started = now;
let sqn = pd.next_sqn;
let mut attempts: u8 = 0;
let max = self.retry.max_retries;
let budget = self.retry.overall_budget_ms;
loop {
self.send_with_sqn(pd_addr, pd.use_crc, sqn, command)?;
match self.recv_one_with_sqn(sqn) {
Ok(reply) => {
let now = self.clock.now_ms();
pd.mark_seen(now);
if matches!(reply, Reply::Busy(_)) {
return Ok(ExchangeOutcome::Busy);
}
pd.bump_sqn();
return Ok(ExchangeOutcome::Reply(reply));
}
Err(Error::Timeout) => {
attempts += 1;
let now = self.clock.now_ms();
if pd.is_offline(now) {
return Ok(ExchangeOutcome::Offline);
}
if attempts > max {
return Ok(ExchangeOutcome::Timeout);
}
if budget != 0 && now.saturating_sub(started) >= budget as u64 {
return Ok(ExchangeOutcome::Timeout);
}
continue;
}
Err(other) => return Err(other),
}
}
}
fn recv_one_with_sqn(&mut self, _expected_sqn: Sqn) -> Result<Reply, Error> {
let start = self.clock.now_ms();
let mut empty_reads = 0u8;
loop {
if let Some((reply_code, data)) = self.try_parse_packet()? {
return Reply::decode(reply_code, &data);
}
let mut tmp = [0u8; 64];
let n = self.transport.read(&mut tmp)?;
if n > 0 {
self.rx_buf.extend_from_slice(&tmp[..n]);
empty_reads = 0;
continue;
}
if self.clock.now_ms().saturating_sub(start) >= self.reply_delay_ms as u64 {
return Err(Error::Timeout);
}
empty_reads = empty_reads.saturating_add(1);
if empty_reads >= 4 {
return Err(Error::Timeout);
}
}
}
fn try_parse_packet(&mut self) -> Result<Option<(ReplyCode, Vec<u8>)>, Error> {
while let Some(som_pos) = self.rx_buf.iter().position(|&b| b == crate::SOM) {
self.rx_buf.drain(..som_pos);
match ParsedPacket::parse(&self.rx_buf) {
Ok((parsed, used)) => {
let code = ReplyCode::from_byte(parsed.code)?;
let data = parsed.data.to_vec();
self.rx_buf.drain(..used);
return Ok(Some((code, data)));
}
Err(Error::Truncated { .. }) => return Ok(None),
Err(Error::BadSom(_)) => {
self.rx_buf.remove(0);
continue;
}
Err(Error::BadCrc { .. }) | Err(Error::BadChecksum { .. }) => {
self.rx_buf.remove(0);
continue;
}
Err(other) => return Err(other),
}
}
Ok(None)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::clock::MockClock;
use crate::command::Poll;
use crate::transport::VecTransport;
#[test]
fn send_poll_emits_correct_bytes() {
let clock = MockClock::new();
let transport = VecTransport::new();
let mut acu = Acu::new(transport, clock);
let mut pd = PdState::default();
let bytes = acu.send_to(0x05, &mut pd, &Command::Poll(Poll)).unwrap();
assert_eq!(bytes[0], crate::SOM);
assert_eq!(bytes[1], 0x05);
assert_eq!(bytes[4] & 0x0F, 0x04); }
#[test]
fn exchange_offline_when_silent() {
let clock = MockClock::new();
let transport = VecTransport::new();
let mut acu = Acu::new(transport, clock.clone());
acu.retry = RetryConfig {
max_retries: 0,
overall_budget_ms: 0,
};
let mut pd = PdState::default();
pd.mark_seen(0);
clock.set(crate::OFFLINE_THRESHOLD_MS as u64 + 1);
let outcome = acu.exchange(0x05, &mut pd, &Command::Poll(Poll)).unwrap();
assert_eq!(outcome, ExchangeOutcome::Offline);
}
#[test]
fn timeout_then_no_more_retries_returns_timeout() {
let clock = MockClock::new();
let transport = VecTransport::new();
let mut acu = Acu::new(transport, clock.clone());
acu.retry = RetryConfig {
max_retries: 0,
overall_budget_ms: 0,
};
let mut pd = PdState::default();
pd.mark_seen(0);
clock.set(crate::REPLY_DELAY_MS as u64 + 1);
let outcome = acu.exchange(0x05, &mut pd, &Command::Poll(Poll)).unwrap();
assert_eq!(outcome, ExchangeOutcome::Timeout);
}
}