use std::iter::FromIterator;
#[cfg(not(test))]
use log::debug;
#[cfg(test)]
use std::println as debug;
use crate::{Message, MessageResult};
use super::combiner;
use super::output::TransportState;
use super::timeddata::TimedData;
use super::waveform;
pub(crate) const MAX_MESSAGE_LENGTH: usize = 268;
pub(crate) const MAX_INTERBURST_SYMBOLS: u64 = ((1.05 * waveform::BAUD_HZ) + 17.0 * 8.0) as u64;
pub(crate) const MAX_HISTORY_DURATION: u64 =
2 * (MAX_INTERBURST_SYMBOLS + 8 * MAX_MESSAGE_LENGTH as u64);
pub(crate) type Burst = arrayvec::ArrayVec<u8, MAX_MESSAGE_LENGTH>;
#[derive(Clone, Debug)]
pub struct Assembler {
history: BurstHistory,
state: PendingResult,
previous: Option<PreviousMessage>,
}
impl Assembler {
pub fn new() -> Self {
Self {
history: BurstHistory::with_capacity(3),
state: PendingResult::default(),
previous: Option::default(),
}
}
pub fn reset(&mut self) {
self.history.clear();
self.state = PendingResult::default();
self.previous = Option::default();
}
pub fn assemble<B>(&mut self, burst: B, symbol_count: u64) -> TransportState
where
B: AsRef<[u8]>,
{
let burst = burst.as_ref();
if burst.is_empty() {
return self.idle(symbol_count);
}
prune_history(&mut self.history, symbol_count);
prune_previous(&mut self.previous, symbol_count);
self.history.push_back(TimedData::with_deadline(
Burst::from_iter(
burst[0..usize::min(burst.len(), MAX_MESSAGE_LENGTH)]
.iter()
.copied(),
),
symbol_count + MAX_HISTORY_DURATION,
));
if let Some(msg) = self.deduplicate(combiner::combine(self.bursts())) {
self.state.accept(msg, symbol_count);
}
self.idle(symbol_count)
}
pub fn idle(&mut self, symbol_count: u64) -> TransportState {
prune_history(&mut self.history, symbol_count);
match self.state.poll(symbol_count) {
Some(Ok(msg)) => {
debug!(
"assembler: ready ({} voting, {} errors): \"{}\"",
msg.voting_byte_count(),
msg.parity_error_count(),
msg
);
self.previous = Some(TimedData::with_deadline(
msg.clone(),
symbol_count + MAX_HISTORY_DURATION,
));
TransportState::Message(Ok(msg))
}
Some(Err(err)) => {
debug!("assembler: gave up on bad decode: {}", err);
TransportState::Message(Err(err))
}
_ => {
if self.history.is_empty() {
TransportState::Idle
} else {
TransportState::Assembling
}
}
}
}
#[inline]
fn bursts(&self) -> impl ExactSizeIterator<Item = &[u8]> {
self.history.iter().map(|td| td.data.as_slice())
}
fn deduplicate(&self, res: Option<MessageResult>) -> Option<MessageResult> {
let res = res?;
match res {
Ok(msg) if self.is_not_duplicate(&msg) => Some(Ok(msg)),
Ok(msg) => {
debug!("assembler: suppressed duplicate message \"{}\"", msg);
None
}
_ => Some(res),
}
}
fn is_not_duplicate(&self, other: &Message) -> bool {
if let Some(prev) = &self.previous {
prev.as_ref().as_str() != other.as_str()
} else {
true
}
}
}
impl Default for Assembler {
fn default() -> Self {
Self::new()
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
enum PendingResult {
Empty,
Pending(TimedData<MessageResult>),
}
impl PendingResult {
pub fn accept(&mut self, msg: MessageResult, now: u64) -> bool {
let new = match msg {
Ok(Message::EndOfMessage) => TimedData::with_deadline(msg, now),
_ => TimedData::with_deadline(msg, now + MAX_INTERBURST_SYMBOLS),
};
if let PendingResult::Pending(old) = self {
let replace_with_new_msg = match (&old.data, &new.data) {
(Err(_), _) => true,
(Ok(Message::EndOfMessage), Ok(Message::StartOfMessage(_m))) => true,
(Ok(Message::StartOfMessage(old)), Ok(Message::StartOfMessage(new))) => {
new.voting_byte_count() >= old.voting_byte_count()
}
_ => false,
};
if replace_with_new_msg {
debug!(
"assembler: replacing pending \"{:?}\" with \"{:?}\"",
old.data, new.data
);
*old = new;
true
} else {
false
}
} else {
debug!("assembler: pending: \"{:?}\"", new);
*self = PendingResult::Pending(new);
true
}
}
pub fn poll(&mut self, now: u64) -> Option<MessageResult> {
match self {
Self::Pending(timed_result) if timed_result.is_expired_at(now) => {
let out = Some(timed_result.data.clone());
*self = Self::Empty;
out
}
_ => None,
}
}
}
impl Default for PendingResult {
fn default() -> Self {
Self::Empty
}
}
type BurstHistory = std::collections::VecDeque<TimedData<Burst>>;
type PreviousMessage = TimedData<Message>;
#[inline]
fn prune_history(history: &mut BurstHistory, symbol_count: u64) {
history.retain(|entry| !entry.is_expired_at(symbol_count));
while history.len() > 2 {
drop(history.pop_front());
}
}
#[inline]
fn prune_previous(previous: &mut Option<PreviousMessage>, symbol_count: u64) {
match previous {
Some(msg) if msg.is_expired_at(symbol_count) => *previous = None,
_ => {}
}
}
#[cfg(test)]
mod tests {
use std::convert::TryFrom;
use super::super::waveform;
use super::*;
use crate::MessageDecodeErr;
const ONE_SECOND: u64 = waveform::BAUD_HZ as u64;
const BURST_TIMEOUT: u64 = (1.31 * waveform::BAUD_HZ) as u64;
const ALMOST_TIMEOUT: u64 = (1.2 * waveform::BAUD_HZ) as u64;
const TEST_EOM: &[u8] = "NNNN".as_bytes();
const TEST_MSG_GOOD: &[u8] = "ZCZC-EAS-DMO-999000+0015-0011122-NOCALL00-".as_bytes();
const TEST_MSG_ERRS: &[u8] = "ZCZK-EAS-DMF-999!00+0015-0011122-NOCALL00-KXYZ".as_bytes();
const TEST_MSG_LONGEST: &[u8] = "ZCZC-EAS-DMO-372088-091724-919623-645687-745748-175234-039940-955869-091611-304171-931612-334828-179485-569615-809223-830187-611340-014693-472885-084645-977764-466883-406863-390018-701741-058097-752790-311648-820127-255900-581947+0000-0001122-NOCALL00-".as_bytes();
fn history_times(history: &BurstHistory) -> Vec<u64> {
history.iter().map(|elem| elem.deadline).collect()
}
fn simulate_bursts<'a, I>(
time: &'a mut u64,
src: I,
) -> impl Iterator<Item = (usize, (u64, &'a [u8]))>
where
I: IntoIterator<Item = &'a (u64, &'a [u8])>,
{
src.into_iter()
.map(move |(delay, data)| {
*time = *time + 8 * data.len() as u64 + delay;
if !data.is_empty() {
*time += 16 * 8; }
(*time, *data)
})
.enumerate()
}
#[test]
fn test_prune_history() {
let mut history = BurstHistory::default();
history.push_back(TimedData::with_deadline(Burst::default(), 1));
history.push_back(TimedData::with_deadline(Burst::default(), 2144));
history.push_back(TimedData::with_deadline(Burst::default(), 3000));
prune_history(&mut history, 0);
assert_eq!(history.len(), 2);
prune_history(&mut history, 2143);
assert_eq!(&[2144u64, 3000], history_times(&history).as_slice());
prune_history(&mut history, 2999);
assert_eq!(&[3000u64], history_times(&history).as_slice());
prune_history(&mut history, 6000);
assert!(history.is_empty());
}
#[test]
fn test_pending_result() {
const NO_ERRORS: &[u8] = &[0u8; TEST_MSG_GOOD.len()];
const VOTING_NONE: &[u8] = &[2u8; TEST_MSG_GOOD.len()];
const VOTING_ALL: &[u8] = &[3u8; TEST_MSG_GOOD.len()];
let test_message_novoting = Message::try_from((TEST_MSG_GOOD, NO_ERRORS, VOTING_NONE));
assert!(test_message_novoting.is_ok());
let test_message_voting = Message::try_from((TEST_MSG_GOOD, NO_ERRORS, VOTING_ALL));
assert!(test_message_voting.is_ok());
let mut uut = PendingResult::default();
assert!(uut.accept(Err(MessageDecodeErr::NotAscii), 0));
assert!(uut.accept(Err(MessageDecodeErr::UnrecognizedPrefix), 0));
assert_eq!(None, uut.poll(0));
assert!(uut.accept(Ok(Message::EndOfMessage), 0));
assert!(!uut.accept(Ok(Message::EndOfMessage), 0));
assert_eq!(Some(Ok(Message::EndOfMessage)), uut.poll(0));
assert_eq!(PendingResult::default(), uut);
assert!(uut.accept(test_message_novoting.clone(), 0));
assert!(uut.accept(test_message_novoting.clone(), 0));
assert_eq!(None, uut.poll(0));
assert!(uut.accept(test_message_voting.clone(), 5650));
assert!(!uut.accept(Ok(Message::EndOfMessage), 5650));
assert!(!uut.accept(Err(MessageDecodeErr::NotAscii), 5650));
assert!(!uut.accept(test_message_novoting.clone(), 5650));
assert_eq!(None, uut.poll(5650));
let out = uut.poll(2 * 5650);
assert_eq!(Some(test_message_voting), out);
}
#[test]
fn test_assembler_deduplicate() {
const FOUR_EOM: &[(u64, &[u8])] = &[
(999 * ONE_SECOND, &[]),
(0, TEST_EOM),
(ONE_SECOND, TEST_EOM),
(ONE_SECOND, TEST_EOM),
(12 * ONE_SECOND, TEST_EOM),
];
let mut assembler = Assembler::default();
let mut time = 0u64;
let mut ok = 0i32;
for (itr, (tm, data)) in simulate_bursts(&mut time, FOUR_EOM.iter()) {
match (itr, assembler.assemble(data, tm), &assembler.state) {
(0, TransportState::Idle, PendingResult::Empty) => {
ok += 1;
}
(1, TransportState::Message(Ok(Message::EndOfMessage)), PendingResult::Empty) => {
ok += 1;
}
(2, TransportState::Assembling, PendingResult::Empty) => {
ok += 1;
}
(3, TransportState::Assembling, PendingResult::Empty) => {
ok += 1;
}
(4, TransportState::Message(Ok(Message::EndOfMessage)), PendingResult::Empty) => {
ok += 1;
}
_ => {
unreachable!()
}
}
}
assert_eq!(ok, 5);
}
#[test]
fn test_assembler_normal_operation() {
const SOM_EOM: &[(u64, &[u8])] = &[
(0, TEST_MSG_GOOD),
(ONE_SECOND, &[]),
(0, TEST_MSG_GOOD),
(ONE_SECOND, &[]),
(0, TEST_MSG_ERRS),
(BURST_TIMEOUT, &[]),
(15 * ONE_SECOND, TEST_EOM),
(ONE_SECOND, TEST_EOM),
(ONE_SECOND, TEST_EOM),
];
let mut assembler = Assembler::default();
let mut time = 0u64;
let mut ok = 0i32;
for (itr, (tm, data)) in simulate_bursts(&mut time, SOM_EOM.iter()) {
let out = assembler.assemble(data, tm);
match (itr, out, &assembler.state) {
(0, TransportState::Assembling, PendingResult::Empty) => {
ok += 1;
}
(1, TransportState::Assembling, PendingResult::Empty) => {
ok += 1;
}
(2, TransportState::Assembling, PendingResult::Pending(_)) => {
ok += 1;
}
(3, TransportState::Assembling, PendingResult::Pending(_)) => {
ok += 1;
}
(4, TransportState::Assembling, PendingResult::Pending(_)) => {
ok += 1;
}
(
5,
TransportState::Message(Ok(Message::StartOfMessage(hdr))),
PendingResult::Empty,
) => {
assert_eq!(hdr.voting_byte_count(), TEST_MSG_GOOD.len());
ok += 1;
}
(6, TransportState::Message(Ok(Message::EndOfMessage)), PendingResult::Empty) => {
ok += 1;
}
(7, TransportState::Assembling, PendingResult::Empty) => {
ok += 1;
}
(8, TransportState::Assembling, PendingResult::Empty) => {
ok += 1;
}
_ => {
unreachable!()
}
}
}
assert_eq!(ok, 9);
}
#[test]
fn test_assembler_very_long_message() {
const LONG_MSGS: &[(u64, &[u8])] = &[
(0, TEST_MSG_LONGEST),
(ALMOST_TIMEOUT, &[]),
(0, TEST_MSG_LONGEST),
(ALMOST_TIMEOUT, &[]),
(0, TEST_MSG_LONGEST),
(BURST_TIMEOUT, &[]),
];
let mut assembler = Assembler::default();
let mut time = 0u64;
let mut ok = 0i32;
for (itr, (tm, data)) in simulate_bursts(&mut time, LONG_MSGS.iter()) {
let out = assembler.assemble(data, tm);
match (itr, out, &assembler.state) {
(0, TransportState::Assembling, PendingResult::Empty) => {
ok += 1;
}
(1, TransportState::Assembling, PendingResult::Empty) => {
ok += 1;
}
(2, TransportState::Assembling, PendingResult::Pending(_)) => {
ok += 1;
}
(3, TransportState::Assembling, PendingResult::Pending(_)) => {
ok += 1;
}
(4, TransportState::Assembling, PendingResult::Pending(_)) => {
ok += 1;
}
(
5,
TransportState::Message(Ok(Message::StartOfMessage(hdr))),
PendingResult::Empty,
) => {
assert_eq!(hdr.voting_byte_count(), TEST_MSG_LONGEST.len());
assert_eq!(hdr.as_str().as_bytes(), TEST_MSG_LONGEST);
ok += 1;
}
_ => unreachable!(),
}
}
assert_eq!(ok, 6);
}
#[test]
fn test_assembler_very_long_message_missing_middle() {
const LONG_MSGS: &[(u64, &[u8])] = &[
(0, TEST_MSG_LONGEST),
(ALMOST_TIMEOUT, &[]),
(268 * 8, &[]),
(ALMOST_TIMEOUT, &[]),
(0, TEST_MSG_LONGEST),
(BURST_TIMEOUT, &[]),
];
let mut assembler = Assembler::default();
let mut time = 0u64;
let mut ok = 0i32;
for (itr, (tm, data)) in simulate_bursts(&mut time, LONG_MSGS.iter()) {
let out = assembler.assemble(data, tm);
match (itr, out, &assembler.state) {
(4, TransportState::Assembling, PendingResult::Pending(_)) => {
ok += 1;
}
(
5,
TransportState::Message(Ok(Message::StartOfMessage(hdr))),
PendingResult::Empty,
) => {
assert_eq!(hdr.voting_byte_count(), 0);
assert_eq!(hdr.as_str().as_bytes(), TEST_MSG_LONGEST);
ok += 1;
}
(_, TransportState::Assembling, PendingResult::Empty) => {
ok += 1;
}
_ => {
unreachable!()
}
}
}
assert_eq!(ok, 6);
}
#[test]
fn test_assembler_quickly_with_missing() {
const TWO_EOM_AND_MSG: &[(u64, &[u8])] = &[
(0, TEST_EOM),
(ONE_SECOND, TEST_EOM),
(ONE_SECOND, TEST_MSG_GOOD),
((1.1 * ONE_SECOND as f32) as u64, TEST_MSG_GOOD),
(BURST_TIMEOUT, &[]),
(ONE_SECOND, TEST_EOM),
(ONE_SECOND, TEST_EOM),
];
let mut assembler = Assembler::default();
let mut time = 0u64;
let mut ok = 0i32;
for (itr, (tm, data)) in simulate_bursts(&mut time, TWO_EOM_AND_MSG.iter()) {
match (itr, assembler.assemble(data, tm), &assembler.state) {
(0, TransportState::Message(Ok(Message::EndOfMessage)), PendingResult::Empty) => {
ok += 1;
}
(1, TransportState::Assembling, PendingResult::Empty) => {
ok += 1;
}
(2, TransportState::Assembling, PendingResult::Empty) => {
ok += 1;
}
(3, TransportState::Assembling, PendingResult::Pending(_x)) => {
ok += 1;
}
(
4,
TransportState::Message(Ok(Message::StartOfMessage(msg))),
PendingResult::Empty,
) => {
assert_eq!(msg.voting_byte_count(), 4);
ok += 1;
}
(5, TransportState::Assembling, PendingResult::Empty) => {
ok += 1;
}
(6, TransportState::Message(Ok(Message::EndOfMessage)), PendingResult::Empty) => {
ok += 1;
}
_ => {
unreachable!()
}
}
}
assert_eq!(ok, 7);
}
}