use std::collections::HashMap;
use std::time::{Duration, SystemTime};
use arrayvec::ArrayString;
use crate::frame::AcarsMessage;
pub const REASSEMBLY_TIMEOUT: Duration = Duration::from_secs(30);
pub const MAX_PENDING_MESSAGES: usize = 256;
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
struct ReassemblyKey {
aircraft: ArrayString<8>,
message_no: ArrayString<5>,
}
#[derive(Debug)]
struct PendingMessage {
first_seen: SystemTime,
blocks: Vec<AcarsMessage>,
}
#[derive(Debug, Default)]
pub struct MessageAssembler {
pending: HashMap<ReassemblyKey, PendingMessage>,
}
impl MessageAssembler {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[cfg(test)]
#[must_use]
fn pending_count(&self) -> usize {
self.pending.len()
}
pub fn observe(&mut self, msg: AcarsMessage, now: SystemTime) -> Vec<AcarsMessage> {
let mut out = self.sweep_timeouts(now);
let Some(key) = build_key(&msg) else {
out.push(msg);
return out;
};
if msg.end_of_message {
if let Some(pending) = self.pending.remove(&key) {
out.push(combine(pending.blocks, msg));
} else {
out.push(msg);
}
return out;
}
if let Some(pending) = self.pending.get_mut(&key) {
pending.blocks.push(msg);
} else {
if self.pending.len() >= MAX_PENDING_MESSAGES {
self.evict_oldest_pending();
}
self.pending.insert(
key,
PendingMessage {
first_seen: now,
blocks: vec![msg],
},
);
}
out
}
pub fn flush(&mut self) -> Vec<AcarsMessage> {
self.pending
.drain()
.filter_map(|(_, pending)| combine_partial(pending.blocks))
.collect()
}
pub fn drain_timeouts(&mut self, now: SystemTime) -> Vec<AcarsMessage> {
self.sweep_timeouts(now)
}
fn sweep_timeouts(&mut self, now: SystemTime) -> Vec<AcarsMessage> {
let cutoff = now.checked_sub(REASSEMBLY_TIMEOUT);
let Some(cutoff) = cutoff else {
return Vec::new();
};
let stale_keys: Vec<ReassemblyKey> = self
.pending
.iter()
.filter(|(_, p)| p.first_seen < cutoff)
.map(|(k, _)| k.clone())
.collect();
let mut out = Vec::with_capacity(stale_keys.len());
let mut entries: Vec<(SystemTime, ReassemblyKey, AcarsMessage)> = stale_keys
.into_iter()
.filter_map(|key| {
self.pending
.remove(&key)
.and_then(|p| combine_partial(p.blocks).map(|m| (p.first_seen, key, m)))
})
.collect();
entries.sort_by(|(ta, ka, _), (tb, kb, _)| {
ta.cmp(tb)
.then_with(|| ka.aircraft.cmp(&kb.aircraft))
.then_with(|| ka.message_no.cmp(&kb.message_no))
});
out.extend(entries.into_iter().map(|(_, _, m)| m));
out
}
fn evict_oldest_pending(&mut self) {
let oldest = self
.pending
.iter()
.min_by_key(|(_, p)| p.first_seen)
.map(|(k, _)| k.clone());
if let Some(key) = oldest {
tracing::debug!(
?key,
"ACARS reassembly: evicting oldest pending bucket (cap reached)"
);
self.pending.remove(&key);
}
}
}
fn build_key(msg: &AcarsMessage) -> Option<ReassemblyKey> {
let message_no = msg.message_no?;
if msg.aircraft.is_empty() || message_no.is_empty() {
return None;
}
Some(ReassemblyKey {
aircraft: msg.aircraft,
message_no,
})
}
fn combine(mut pending: Vec<AcarsMessage>, etx: AcarsMessage) -> AcarsMessage {
pending.push(etx.clone());
pending.sort_by_key(|m| m.block_id);
let block_count: u8 = u8::try_from(pending.len()).unwrap_or(u8::MAX);
let mut iter = pending.into_iter();
let Some(first) = iter.next() else {
return AcarsMessage {
reassembled_block_count: 1,
..etx
};
};
let mut out = first.clone();
let mut combined = first.text;
for next in iter {
out.timestamp = next.timestamp;
out.channel_idx = next.channel_idx;
out.freq_hz = next.freq_hz;
out.level_db = next.level_db;
out.error_count = next.error_count;
out.mode = next.mode;
out.label = next.label;
out.block_id = next.block_id;
out.ack = next.ack;
out.flight_id = next.flight_id.or(out.flight_id);
out.message_no = next.message_no.or(out.message_no);
out.end_of_message = next.end_of_message;
combined.push_str(&next.text);
}
out.text = combined;
out.reassembled_block_count = block_count;
out
}
fn combine_partial(mut pending: Vec<AcarsMessage>) -> Option<AcarsMessage> {
pending.sort_by_key(|m| m.block_id);
let block_count: u8 = u8::try_from(pending.len()).unwrap_or(u8::MAX);
let mut iter = pending.into_iter();
let first = iter.next()?;
let mut out = first.clone();
let mut combined = first.text;
for next in iter {
out.timestamp = next.timestamp;
out.channel_idx = next.channel_idx;
out.freq_hz = next.freq_hz;
out.level_db = next.level_db;
out.error_count = next.error_count;
out.mode = next.mode;
out.label = next.label;
out.block_id = next.block_id;
out.ack = next.ack;
out.flight_id = next.flight_id.or(out.flight_id);
out.message_no = next.message_no.or(out.message_no);
combined.push_str(&next.text);
}
out.text = combined;
out.reassembled_block_count = block_count;
debug_assert!(!out.end_of_message);
Some(out)
}
#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::similar_names, clippy::panic)]
mod tests {
use super::*;
use std::time::SystemTime;
fn make_msg(
aircraft: &str,
message_no: &str,
block_id: u8,
etx: bool,
text: &str,
) -> AcarsMessage {
AcarsMessage {
timestamp: SystemTime::UNIX_EPOCH,
channel_idx: 0,
freq_hz: 131_550_000.0,
level_db: 0.0,
error_count: 0,
mode: b'2',
label: *b"H1",
block_id,
ack: 0x15,
aircraft: ArrayString::from(aircraft)
.expect("test fixture aircraft fits ArrayString<8>"),
flight_id: None,
message_no: Some(
ArrayString::from(message_no).expect("test fixture message_no fits ArrayString<5>"),
),
text: text.to_string(),
end_of_message: etx,
reassembled_block_count: 1,
parsed: None,
}
}
#[test]
fn passthrough_for_single_block_etx() {
let mut a = MessageAssembler::new();
let msg = make_msg(".N12345", "M01A", 1, true, "PART1");
let out = a.observe(msg, SystemTime::UNIX_EPOCH);
assert_eq!(out.len(), 1);
assert_eq!(out[0].text, "PART1");
assert_eq!(out[0].reassembled_block_count, 1);
assert!(out[0].end_of_message);
assert_eq!(a.pending_count(), 0);
}
#[test]
fn passthrough_when_message_no_missing() {
let mut a = MessageAssembler::new();
let mut msg = make_msg(".N12345", "M01A", 1, false, "X");
msg.message_no = None;
let out = a.observe(msg, SystemTime::UNIX_EPOCH);
assert_eq!(out.len(), 1);
assert_eq!(a.pending_count(), 0);
}
#[test]
fn etb_then_etx_reassembles_in_order() {
let mut a = MessageAssembler::new();
let etb = make_msg(".N12345", "M01A", 1, false, "FIRST_HALF");
let etx = make_msg(".N12345", "M01A", 2, true, "_SECOND_HALF");
let now = SystemTime::UNIX_EPOCH;
let out1 = a.observe(etb, now);
assert_eq!(out1.len(), 0, "ETB parked, no emission yet");
assert_eq!(a.pending_count(), 1);
let out2 = a.observe(etx, now);
assert_eq!(out2.len(), 1);
let merged = &out2[0];
assert_eq!(merged.text, "FIRST_HALF_SECOND_HALF");
assert_eq!(merged.reassembled_block_count, 2);
assert!(merged.end_of_message);
assert_eq!(merged.block_id, 2, "block_id from final ETX");
assert_eq!(a.pending_count(), 0);
}
#[test]
fn out_of_order_blocks_sort_by_block_id() {
let mut a = MessageAssembler::new();
let now = SystemTime::UNIX_EPOCH;
let _ = a.observe(make_msg(".N12345", "M01A", 2, false, "MIDDLE_"), now);
let _ = a.observe(make_msg(".N12345", "M01A", 3, true, "LAST"), now);
let mut a = MessageAssembler::new();
let _ = a.observe(make_msg(".N12345", "M02B", 2, false, "MIDDLE_"), now);
let out = a.observe(make_msg(".N12345", "M02B", 3, true, "LAST"), now);
assert_eq!(out.len(), 1);
assert_eq!(out[0].text, "MIDDLE_LAST");
assert_eq!(out[0].reassembled_block_count, 2);
}
#[test]
fn out_of_order_etx_before_etb() {
let mut a = MessageAssembler::new();
let now = SystemTime::UNIX_EPOCH;
let etx = make_msg(".N12345", "M03C", 3, true, "FINAL");
let out_etx = a.observe(etx, now);
assert_eq!(out_etx.len(), 1, "ETX emits even without prior ETBs");
assert_eq!(out_etx[0].text, "FINAL");
assert_eq!(out_etx[0].reassembled_block_count, 1);
let etb = make_msg(".N12345", "M03C", 1, false, "EARLY");
let out_etb = a.observe(etb, now);
assert_eq!(out_etb.len(), 0, "late ETB parked, will time out");
assert_eq!(a.pending_count(), 1);
}
#[test]
fn timeout_emits_partial_reassembly() {
let mut a = MessageAssembler::new();
let t0 = SystemTime::UNIX_EPOCH;
let etb = make_msg(".N12345", "M04D", 1, false, "ONLY_BLOCK");
let _ = a.observe(etb, t0);
assert_eq!(a.pending_count(), 1);
let later = t0 + REASSEMBLY_TIMEOUT + Duration::from_secs(1);
let new = make_msg(".N99999", "M99Z", 1, true, "OTHER");
let out = a.observe(new, later);
assert_eq!(out.len(), 2);
assert_eq!(out[0].text, "ONLY_BLOCK");
assert_eq!(out[0].reassembled_block_count, 1);
assert!(!out[0].end_of_message, "partial keeps ETB flag");
assert_eq!(out[1].text, "OTHER");
assert_eq!(a.pending_count(), 0);
}
#[test]
fn flush_drains_all_pending() {
let mut a = MessageAssembler::new();
let now = SystemTime::UNIX_EPOCH;
let _ = a.observe(make_msg(".A", "M1", 1, false, "AAA"), now);
let _ = a.observe(make_msg(".B", "M2", 1, false, "BBB"), now);
let _ = a.observe(make_msg(".A", "M1", 2, false, "_more"), now);
assert_eq!(a.pending_count(), 2);
let mut flushed = a.flush();
flushed.sort_by(|x, y| x.aircraft.as_str().cmp(y.aircraft.as_str()));
assert_eq!(flushed.len(), 2);
assert_eq!(flushed[0].aircraft.as_str(), ".A");
assert_eq!(flushed[0].text, "AAA_more");
assert_eq!(flushed[0].reassembled_block_count, 2);
assert!(!flushed[0].end_of_message);
assert_eq!(flushed[1].aircraft.as_str(), ".B");
assert_eq!(flushed[1].text, "BBB");
assert_eq!(flushed[1].reassembled_block_count, 1);
assert_eq!(a.pending_count(), 0);
}
#[test]
fn cap_evicts_oldest_pending() {
let mut a = MessageAssembler::new();
let base = SystemTime::UNIX_EPOCH;
for i in 0..=MAX_PENDING_MESSAGES {
let aircraft = format!(".N{i:05}");
let msg_no = format!("M{i:03}");
let m = make_msg(&aircraft, &msg_no, 1, false, "X");
let now = base + Duration::from_millis(u64::try_from(i).expect("loop bound fits u64"));
let _ = a.observe(m, now);
}
assert_eq!(a.pending_count(), MAX_PENDING_MESSAGES);
let etx = make_msg(".N00000", "M000", 2, true, "_etx");
let out = a.observe(etx, base);
assert_eq!(out.len(), 1);
assert_eq!(out[0].text, "_etx", "evicted ETB body did NOT come through");
}
#[test]
fn drain_timeouts_emits_only_stale_buckets() {
let mut a = MessageAssembler::new();
let t0 = SystemTime::UNIX_EPOCH;
let _ = a.observe(make_msg(".A", "M1", 1, false, "AAA"), t0);
let _ = a.observe(
make_msg(".B", "M2", 1, false, "BBB"),
t0 + REASSEMBLY_TIMEOUT,
);
let later = t0 + REASSEMBLY_TIMEOUT + REASSEMBLY_TIMEOUT;
let drained = a.drain_timeouts(later);
assert_eq!(drained.len(), 1);
assert_eq!(drained[0].aircraft.as_str(), ".A");
assert_eq!(drained[0].text, "AAA");
assert!(!drained[0].end_of_message, "partial keeps ETB flag");
assert_eq!(a.pending_count(), 1, "fresh .B bucket survives");
}
#[test]
fn first_seen_uses_observation_clock_not_msg_timestamp() {
const ONE_HOUR_SECS: u64 = 60 * 60;
let mut a = MessageAssembler::new();
let now = SystemTime::UNIX_EPOCH + Duration::from_secs(ONE_HOUR_SECS);
let mut etb = make_msg(".A", "M1", 1, false, "X");
etb.timestamp = SystemTime::UNIX_EPOCH;
let _ = a.observe(etb, now);
assert_eq!(a.pending_count(), 1);
let drained = a.drain_timeouts(now + REASSEMBLY_TIMEOUT / 2);
assert_eq!(drained.len(), 0);
assert_eq!(a.pending_count(), 1);
}
#[test]
fn distinct_keys_dont_cross_pollute() {
let mut a = MessageAssembler::new();
let now = SystemTime::UNIX_EPOCH;
let _ = a.observe(make_msg(".A", "M1", 1, false, "AAA"), now);
let _ = a.observe(make_msg(".B", "M1", 1, false, "BBB"), now);
let out_a = a.observe(make_msg(".A", "M1", 2, true, "_a_end"), now);
let out_b = a.observe(make_msg(".B", "M1", 2, true, "_b_end"), now);
assert_eq!(out_a.len(), 1);
assert_eq!(out_a[0].text, "AAA_a_end");
assert_eq!(out_b.len(), 1);
assert_eq!(out_b[0].text, "BBB_b_end");
}
}