use super::{AtLeastOnceInfo, EXTEND_PERIOD, MAX_IDS_PER_RPC};
use std::collections::HashMap;
use tokio::time::{Duration, Instant};
const EXTEND_BUFFER: Duration = Duration::from_secs(2);
#[derive(Debug, Default)]
pub struct Leases {
under_lease: HashMap<String, AtLeastOnceInfo>,
to_ack: Vec<String>,
to_nack: Vec<String>,
}
impl Leases {
pub fn add(&mut self, ack_id: String, info: AtLeastOnceInfo) {
self.under_lease.insert(ack_id, info);
}
pub fn ack(&mut self, ack_id: String) {
self.under_lease.remove(&ack_id);
self.to_ack.push(ack_id);
}
pub fn nack(&mut self, ack_id: String) {
if self.under_lease.remove(&ack_id).is_some() {
self.to_nack.push(ack_id);
}
}
pub fn needs_flush(&self) -> bool {
self.to_ack.len() >= MAX_IDS_PER_RPC || self.to_nack.len() >= MAX_IDS_PER_RPC
}
pub fn drain(&mut self) -> (Vec<String>, Vec<String>) {
(
std::mem::take(&mut self.to_ack),
std::mem::take(&mut self.to_nack),
)
}
pub fn retain(
&mut self,
max_lease: Duration,
max_lease_extension: Duration,
) -> Vec<Vec<String>> {
let now = Instant::now();
let mut batches = Vec::new();
let mut batch = Vec::new();
self.under_lease.retain(|ack_id, info| {
if info.receive_time + max_lease < now {
false
} else if info
.last_extension
.is_some_and(|i| i + max_lease_extension > now + EXTEND_PERIOD + EXTEND_BUFFER)
{
true
} else {
batch.push(ack_id.clone());
if batch.len() == MAX_IDS_PER_RPC {
batches.push(std::mem::take(&mut batch));
}
info.last_extension = Some(now);
true
}
});
if !batch.is_empty() {
batches.push(batch);
}
batches
}
pub fn evict_and_drain(mut self) -> (Vec<String>, Vec<Vec<String>>) {
self.to_nack.extend(self.under_lease.into_keys());
(self.to_ack, super::batch(self.to_nack))
}
}
#[cfg(test)]
impl PartialEq<Leases> for super::tests::TestLeases {
fn eq(&self, leases: &Leases) -> bool {
let under_lease = {
let mut v: Vec<String> = leases.under_lease.keys().cloned().collect();
v.sort();
v
};
let to_ack = {
let mut v = leases.to_ack.clone();
v.sort();
v
};
let to_nack = {
let mut v = leases.to_nack.clone();
v.sort();
v
};
self.under_lease == under_lease && self.to_ack == to_ack && self.to_nack == to_nack
}
}
#[cfg(test)]
mod tests {
use super::super::tests::{Batches, TestLeases, sorted, test_id, test_ids};
use super::*;
use std::collections::HashSet;
const MAX_IDS_PER_RPC: i32 = super::MAX_IDS_PER_RPC as i32;
#[test]
fn basic_add_ack_nack() {
let mut leases = Leases::default();
assert_eq!(
TestLeases {
under_lease: Vec::new(),
to_ack: Vec::new(),
to_nack: Vec::new(),
},
leases
);
leases.add(test_id(1), AtLeastOnceInfo::new());
assert_eq!(
TestLeases {
under_lease: vec![test_id(1)],
to_ack: Vec::new(),
to_nack: Vec::new(),
},
leases
);
leases.add(test_id(2), AtLeastOnceInfo::new());
assert_eq!(
TestLeases {
under_lease: vec![test_id(1), test_id(2)],
to_ack: Vec::new(),
to_nack: Vec::new(),
},
leases
);
leases.add(test_id(3), AtLeastOnceInfo::new());
assert_eq!(
TestLeases {
under_lease: vec![test_id(1), test_id(2), test_id(3)],
to_ack: Vec::new(),
to_nack: Vec::new(),
},
leases
);
leases.ack(test_id(1));
assert_eq!(
TestLeases {
under_lease: vec![test_id(2), test_id(3)],
to_ack: vec![test_id(1)],
to_nack: Vec::new(),
},
leases
);
leases.nack(test_id(2));
assert_eq!(
TestLeases {
under_lease: vec![test_id(3)],
to_ack: vec![test_id(1)],
to_nack: vec![test_id(2)],
},
leases
);
leases.add(test_id(4), AtLeastOnceInfo::new());
assert_eq!(
TestLeases {
under_lease: vec![test_id(3), test_id(4)],
to_ack: vec![test_id(1)],
to_nack: vec![test_id(2)],
},
leases
);
leases.ack(test_id(4));
assert_eq!(
TestLeases {
under_lease: vec![test_id(3)],
to_ack: vec![test_id(1), test_id(4)],
to_nack: vec![test_id(2)],
},
leases
);
leases.nack(test_id(3));
assert_eq!(
TestLeases {
under_lease: Vec::new(),
to_ack: vec![test_id(1), test_id(4)],
to_nack: vec![test_id(2), test_id(3)],
},
leases
);
}
#[test]
fn drain() {
let mut leases = Leases::default();
for i in 0..100 {
leases.add(test_id(i), AtLeastOnceInfo::new());
}
for i in 0..10 {
leases.ack(test_id(i));
}
for i in 10..20 {
leases.nack(test_id(i));
}
assert_eq!(
TestLeases {
under_lease: test_ids(20..100),
to_ack: test_ids(0..10),
to_nack: test_ids(10..20),
},
leases
);
let (to_ack, to_nack) = leases.drain();
assert_eq!(to_ack, test_ids(0..10));
assert_eq!(to_nack, test_ids(10..20));
assert_eq!(
TestLeases {
under_lease: test_ids(20..100),
to_ack: Vec::new(),
to_nack: Vec::new(),
},
leases
);
}
#[test]
fn evict() {
let mut leases = Leases::default();
for i in 0..30 {
leases.add(test_id(i), AtLeastOnceInfo::new());
}
for i in 0..10 {
leases.ack(test_id(i));
}
for i in 10..20 {
leases.nack(test_id(i));
}
assert_eq!(
TestLeases {
under_lease: test_ids(20..30),
to_ack: test_ids(0..10),
to_nack: test_ids(10..20),
},
leases
);
let (to_ack, to_nack) = leases.evict_and_drain();
assert_eq!(sorted(&to_ack), test_ids(0..10));
let to_nack = Batches::flatten(to_nack);
assert_eq!(sorted(&to_nack.ack_ids), test_ids(10..30));
}
#[test]
fn evict_overflow_batches() {
let mut leases = Leases::default();
for i in 0..MAX_IDS_PER_RPC * 3 {
leases.add(test_id(i), AtLeastOnceInfo::new());
}
for i in 0..10 {
leases.ack(test_id(i));
}
for i in 10..MAX_IDS_PER_RPC {
leases.nack(test_id(i));
}
assert_eq!(
TestLeases {
under_lease: test_ids(MAX_IDS_PER_RPC..MAX_IDS_PER_RPC * 3),
to_ack: test_ids(0..10),
to_nack: test_ids(10..MAX_IDS_PER_RPC),
},
leases
);
let (to_ack, to_nack) = leases.evict_and_drain();
assert_eq!(sorted(&to_ack), test_ids(0..10));
let to_nack = Batches::flatten(to_nack);
assert_eq!(
to_nack.counts,
vec![MAX_IDS_PER_RPC, MAX_IDS_PER_RPC, MAX_IDS_PER_RPC - 10]
);
assert_eq!(sorted(&to_nack.ack_ids), test_ids(10..MAX_IDS_PER_RPC * 3));
}
#[test]
fn ack_out_of_lease_included() {
let mut leases = Leases::default();
assert_eq!(
TestLeases {
under_lease: Vec::new(),
to_ack: Vec::new(),
to_nack: Vec::new(),
},
leases
);
leases.ack(test_id(1));
assert_eq!(
TestLeases {
under_lease: Vec::new(),
to_ack: vec![test_id(1)],
to_nack: Vec::new(),
},
leases
);
}
#[test]
fn nack_out_of_lease_ignored() {
let mut leases = Leases::default();
assert_eq!(
TestLeases {
under_lease: Vec::new(),
to_ack: Vec::new(),
to_nack: Vec::new(),
},
leases
);
leases.nack(test_id(1));
assert_eq!(
TestLeases {
under_lease: Vec::new(),
to_ack: Vec::new(),
to_nack: Vec::new(),
},
leases
);
}
#[test]
fn needs_flush_ack() {
let mut leases = Leases::default();
for i in 0..100 {
leases.add(test_id(i), AtLeastOnceInfo::new());
leases.ack(test_id(i));
}
assert!(!leases.needs_flush());
for i in 100..MAX_IDS_PER_RPC {
leases.add(test_id(i), AtLeastOnceInfo::new());
leases.ack(test_id(i));
}
assert!(leases.needs_flush());
}
#[test]
fn needs_flush_nack() {
let mut leases = Leases::default();
for i in 0..100 {
leases.add(test_id(i), AtLeastOnceInfo::new());
leases.nack(test_id(i));
}
assert!(!leases.needs_flush());
for i in 100..MAX_IDS_PER_RPC {
leases.add(test_id(i), AtLeastOnceInfo::new());
leases.nack(test_id(i));
}
assert!(leases.needs_flush());
}
#[test]
fn ack_and_nack_batches_are_independent() {
let mut leases = Leases::default();
let over_half_full = MAX_IDS_PER_RPC / 2 + 100;
for i in 0..over_half_full {
leases.add(test_id(i), AtLeastOnceInfo::new());
leases.ack(test_id(i));
leases.add(test_id(over_half_full + i), AtLeastOnceInfo::new());
leases.nack(test_id(over_half_full + i));
}
assert!(!leases.needs_flush());
}
#[test]
fn batching() -> anyhow::Result<()> {
const NUM_BATCHES: i32 = 5;
let mut leases = Leases::default();
let mut want = HashSet::new();
for i in 0..NUM_BATCHES * MAX_IDS_PER_RPC {
leases.add(test_id(i), AtLeastOnceInfo::new());
want.insert(test_id(i));
}
let batches = leases.retain(Duration::from_secs(1), Duration::ZERO);
assert_eq!(batches.len(), NUM_BATCHES as usize);
let mut got = HashSet::new();
for batch in batches {
assert_eq!(batch.len(), MAX_IDS_PER_RPC as usize);
got.extend(batch.into_iter());
}
assert_eq!(got, want);
Ok(())
}
#[tokio::test(start_paused = true)]
async fn message_expiration() -> anyhow::Result<()> {
const MAX_LEASE: Duration = Duration::from_secs(300);
const DELTA: Duration = Duration::from_secs(1);
let mut leases = Leases::default();
for i in 0..10 {
leases.add(test_id(i), AtLeastOnceInfo::new());
}
tokio::time::advance(DELTA * 2).await;
for i in 10..20 {
leases.add(test_id(i), AtLeastOnceInfo::new());
}
let batches = leases.retain(MAX_LEASE, Duration::ZERO);
assert_eq!(batches.len(), 1);
assert_eq!(sorted(&batches[0]), test_ids(0..20));
assert_eq!(
TestLeases {
under_lease: test_ids(0..20),
to_ack: Vec::new(),
to_nack: Vec::new(),
},
leases
);
tokio::time::advance(MAX_LEASE - DELTA).await;
let batches = leases.retain(MAX_LEASE, Duration::ZERO);
assert_eq!(batches.len(), 1);
assert_eq!(sorted(&batches[0]), test_ids(10..20));
assert_eq!(
TestLeases {
under_lease: test_ids(10..20),
to_ack: Vec::new(),
to_nack: Vec::new(),
},
leases
);
tokio::time::advance(DELTA * 2).await;
let batches = leases.retain(MAX_LEASE, Duration::ZERO);
assert!(batches.is_empty(), "{}", batches.len());
assert_eq!(
TestLeases {
under_lease: Vec::new(),
to_ack: Vec::new(),
to_nack: Vec::new(),
},
leases
);
Ok(())
}
#[tokio::test(start_paused = true)]
async fn necessary_extensions() -> anyhow::Result<()> {
const MAX_LEASE: Duration = Duration::from_secs(900);
const MAX_LEASE_EXTENSION: Duration = Duration::from_secs(10);
let mut leases = Leases::default();
leases.add(test_id(0), AtLeastOnceInfo::new());
let batches = leases.retain(MAX_LEASE, MAX_LEASE_EXTENSION);
assert_eq!(batches, vec![vec![test_id(0)]]);
assert_eq!(
TestLeases {
under_lease: test_ids(0..1),
to_ack: Vec::new(),
to_nack: Vec::new(),
},
leases
);
let batches = leases.retain(MAX_LEASE, MAX_LEASE_EXTENSION);
assert!(batches.is_empty(), "{batches:?}");
assert_eq!(
TestLeases {
under_lease: test_ids(0..1),
to_ack: Vec::new(),
to_nack: Vec::new(),
},
leases
);
tokio::time::advance(MAX_LEASE_EXTENSION - Duration::from_secs(1)).await;
let batches = leases.retain(MAX_LEASE, MAX_LEASE_EXTENSION);
assert_eq!(batches, vec![vec![test_id(0)]]);
assert_eq!(
TestLeases {
under_lease: test_ids(0..1),
to_ack: Vec::new(),
to_nack: Vec::new(),
},
leases
);
Ok(())
}
}