use std::collections::HashMap;
#[derive(Debug, Clone, Copy, PartialEq)]
pub struct ConfirmPayload {
pub delivery_tag: u64,
pub multiple: bool,
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum Confirm {
Ack(ConfirmPayload),
Nack(ConfirmPayload),
}
#[derive(Debug, Clone)]
pub struct ConfirmSmoother {
expected: u64,
out_of_order: HashMap<u64, Confirm>,
}
impl Default for ConfirmSmoother {
fn default() -> ConfirmSmoother {
ConfirmSmoother::new()
}
}
impl ConfirmSmoother {
pub fn new() -> ConfirmSmoother {
ConfirmSmoother::with_expected_delivery_tag(1)
}
pub fn with_expected_delivery_tag(expected: u64) -> ConfirmSmoother {
ConfirmSmoother {
expected,
out_of_order: HashMap::new(),
}
}
pub fn process<'a>(&'a mut self, confirm: Confirm) -> impl Iterator<Item = Confirm> + 'a {
match confirm {
Confirm::Ack(inner) => self.new_iter(inner, Confirm::Ack),
Confirm::Nack(inner) => self.new_iter(inner, Confirm::Nack),
}
}
fn new_iter<'a>(
&'a mut self,
payload: ConfirmPayload,
to_confirm: fn(ConfirmPayload) -> Confirm,
) -> impl Iterator<Item = Confirm> + 'a {
Iter {
parent: self,
payload,
next: None,
to_confirm: move |tag| {
to_confirm(ConfirmPayload {
delivery_tag: tag,
multiple: false,
})
},
done: false,
}
}
}
struct Iter<'a, F: Fn(u64) -> Confirm> {
parent: &'a mut ConfirmSmoother,
payload: ConfirmPayload,
next: Option<Confirm>,
to_confirm: F,
done: bool,
}
impl<'a, F> Drop for Iter<'a, F>
where
F: Fn(u64) -> Confirm,
{
fn drop(&mut self) {
while !self.done {
let _ = self.next();
}
}
}
impl<'a, F> Iterator for Iter<'a, F>
where
F: Fn(u64) -> Confirm,
{
type Item = Confirm;
fn next(&mut self) -> Option<Confirm> {
if self.done {
return None;
}
let payload = self.payload;
if payload.delivery_tag == self.parent.expected {
self.parent.expected += 1;
self.next = self.parent.out_of_order.remove(&self.parent.expected);
return Some((self.to_confirm)(payload.delivery_tag));
}
if payload.delivery_tag > self.parent.expected {
if payload.multiple {
let ret = (self.to_confirm)(self.parent.expected);
self.parent.expected += 1;
return Some(ret);
} else {
self.parent.out_of_order.insert(
payload.delivery_tag,
(self.to_confirm)(payload.delivery_tag),
);
self.done = true;
return None;
}
}
match self.next.take() {
Some(next) => {
self.parent.expected += 1;
self.next = self.parent.out_of_order.remove(&self.parent.expected);
Some(next)
}
None => {
self.done = true;
None
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn single(delivery_tag: u64, f: fn(ConfirmPayload) -> Confirm) -> Confirm {
f(ConfirmPayload {
delivery_tag,
multiple: false,
})
}
fn multiple(delivery_tag: u64, f: fn(ConfirmPayload) -> Confirm) -> Confirm {
f(ConfirmPayload {
delivery_tag,
multiple: true,
})
}
#[test]
fn simple_single() {
let mut flat = ConfirmSmoother::new();
let one = flat.process(single(1, Confirm::Ack));
let expected = vec![single(1, Confirm::Ack)];
assert_eq!(expected, one.collect::<Vec<_>>());
}
#[test]
fn simple_multiple() {
let mut flat = ConfirmSmoother::new();
let three = flat.process(multiple(3, Confirm::Ack));
let expected = (1..=3).map(|i| single(i, Confirm::Ack)).collect::<Vec<_>>();
assert_eq!(expected, three.collect::<Vec<_>>());
}
#[test]
fn single_then_single() {
let mut flat = ConfirmSmoother::new();
let empty = flat.process(single(3, Confirm::Ack));
assert_eq!(empty.count(), 0);
let empty = flat.process(single(2, Confirm::Ack));
assert_eq!(empty.count(), 0);
let two = flat.process(single(1, Confirm::Nack));
let expected = vec![
single(1, Confirm::Nack),
single(2, Confirm::Ack),
single(3, Confirm::Ack),
];
assert_eq!(expected, two.collect::<Vec<_>>());
}
#[test]
fn redelivery() {
let mut flat = ConfirmSmoother::new();
let two = flat.process(multiple(2, Confirm::Ack));
assert_eq!(two.count(), 2);
let empty = flat.process(single(2, Confirm::Ack));
assert_eq!(empty.count(), 0);
let empty = flat.process(single(1, Confirm::Nack));
assert_eq!(empty.count(), 0);
}
#[test]
fn single_single_multiple_single() {
let mut flat = ConfirmSmoother::new();
let empty = flat.process(single(5, Confirm::Nack));
assert_eq!(empty.count(), 0);
let empty = flat.process(single(3, Confirm::Nack));
assert_eq!(empty.count(), 0);
let three = flat.process(multiple(2, Confirm::Ack));
assert_eq!(
vec![
single(1, Confirm::Ack),
single(2, Confirm::Ack),
single(3, Confirm::Nack)
],
three.collect::<Vec<_>>()
);
let two = flat.process(single(4, Confirm::Ack));
assert_eq!(
vec![single(4, Confirm::Ack), single(5, Confirm::Nack),],
two.collect::<Vec<_>>()
);
}
#[test]
fn drop_without_running_iter_to_completion() {
let mut flat = ConfirmSmoother::new();
let _ = flat.process(multiple(2, Confirm::Ack));
let one = flat.process(single(3, Confirm::Ack));
let expected = vec![single(3, Confirm::Ack)];
assert_eq!(expected, one.collect::<Vec<_>>());
}
}