use std::collections::{BTreeMap, BTreeSet};
use crate::envelope::RemoteEnvelope;
use crate::pdu::AckInfo;
#[derive(Debug, Default, Clone, Copy)]
pub struct SeqNo(pub u64);
impl SeqNo {
pub fn advance(&mut self) -> u64 {
self.0 = self.0.wrapping_add(1);
self.0
}
}
pub struct AckedSendBuffer {
capacity: u32,
pending: BTreeMap<u64, RemoteEnvelope>,
nacks: BTreeSet<u64>,
}
impl AckedSendBuffer {
pub fn new(capacity: u32) -> Self {
Self { capacity, pending: BTreeMap::new(), nacks: BTreeSet::new() }
}
pub fn capacity(&self) -> u32 {
self.capacity
}
pub fn pending_len(&self) -> usize {
self.pending.len()
}
pub fn is_full(&self) -> bool {
self.pending.len() >= self.capacity as usize
}
pub fn push(&mut self, env: RemoteEnvelope) -> Result<(), Box<RemoteEnvelope>> {
if self.is_full() {
return Err(Box::new(env));
}
self.pending.insert(env.seq_no, env);
Ok(())
}
pub fn apply_ack(&mut self, ack: &AckInfo) {
self.pending.retain(|seq, _| *seq > ack.cumulative_ack);
self.nacks.extend(ack.nacks.iter().copied());
}
pub fn drain_resend(&mut self) -> Vec<RemoteEnvelope> {
let mut out = Vec::new();
let nacks = std::mem::take(&mut self.nacks);
for seq in nacks {
if let Some(e) = self.pending.get(&seq).cloned() {
out.push(e);
}
}
out
}
}
pub struct AckedReceiveBuffer {
cumulative: u64,
delivered: BTreeSet<u64>,
}
impl Default for AckedReceiveBuffer {
fn default() -> Self {
Self::new()
}
}
impl AckedReceiveBuffer {
pub fn new() -> Self {
Self { cumulative: 0, delivered: BTreeSet::new() }
}
pub fn observe(&mut self, seq_no: u64) -> bool {
if seq_no <= self.cumulative {
return false;
}
if !self.delivered.insert(seq_no) {
return false;
}
loop {
let next = self.cumulative + 1;
if self.delivered.remove(&next) {
self.cumulative = next;
} else {
break;
}
}
true
}
pub fn ack(&self) -> AckInfo {
let nacks: Vec<u64> = self.delivered.iter().copied().collect();
AckInfo { cumulative_ack: self.cumulative, nacks: missing_below(self.cumulative, &nacks) }
}
}
fn missing_below(_cumulative: u64, _delivered_above: &[u64]) -> Vec<u64> {
Vec::new()
}
#[cfg(test)]
mod tests {
use super::*;
fn env(seq: u64) -> RemoteEnvelope {
RemoteEnvelope::user("akka.tcp://X@h:1/user/a", None, 0, 0, seq, 1, "u32", vec![])
}
#[test]
fn send_buffer_acks_remove_pending() {
let mut sb = AckedSendBuffer::new(8);
for i in 1..=5 {
sb.push(env(i)).unwrap();
}
assert_eq!(sb.pending_len(), 5);
sb.apply_ack(&AckInfo { cumulative_ack: 3, nacks: vec![] });
assert_eq!(sb.pending_len(), 2);
}
#[test]
fn receive_buffer_dedups_and_advances() {
let mut rb = AckedReceiveBuffer::new();
assert!(rb.observe(1));
assert!(rb.observe(2));
assert!(!rb.observe(2));
assert_eq!(rb.ack().cumulative_ack, 2);
assert!(rb.observe(4));
assert_eq!(rb.ack().cumulative_ack, 2);
assert!(rb.observe(3));
assert_eq!(rb.ack().cumulative_ack, 4);
}
#[test]
fn send_buffer_full_returns_envelope() {
let mut sb = AckedSendBuffer::new(2);
sb.push(env(1)).unwrap();
sb.push(env(2)).unwrap();
let leftover = sb.push(env(3)).unwrap_err();
assert_eq!(leftover.seq_no, 3);
}
}