use std::sync::Arc;
use std::time::Duration;
use tokio::time::sleep;
use crate::CounterComparator;
use crate::icounter::{
InstanceAwareCounterTrait, LaxInstanceAwareCounter, LaxInstanceAwareCounterOptions,
};
use super::common::{make_connection, run_id};
use crate::DistkitRedisKey;
const FLUSH_MS: u64 = 10;
const THRESHOLD_MS: u64 = 300;
fn unique_prefix(name: &str) -> String {
format!("{}_{}", run_id(), name)
}
async fn make_lax_from_prefix(prefix: &str) -> Arc<LaxInstanceAwareCounter> {
let conn = make_connection().await;
LaxInstanceAwareCounter::new(LaxInstanceAwareCounterOptions {
prefix: DistkitRedisKey::from(prefix.to_string()),
connection_manager: conn,
dead_instance_threshold_ms: THRESHOLD_MS,
flush_interval: Duration::from_millis(FLUSH_MS),
allowed_lag: Duration::from_millis(FLUSH_MS),
})
}
async fn make_lax(name: &str) -> Arc<LaxInstanceAwareCounter> {
make_lax_from_prefix(&unique_prefix(name)).await
}
async fn make_lax_pair(
name: &str,
) -> (
Arc<LaxInstanceAwareCounter>,
Arc<LaxInstanceAwareCounter>,
String,
) {
let prefix = unique_prefix(name);
let c1 = make_lax_from_prefix(&prefix).await;
let c2 = make_lax_from_prefix(&prefix).await;
(c1, c2, prefix)
}
fn key(name: &str) -> DistkitRedisKey {
DistkitRedisKey::from(name.to_string())
}
#[tokio::test]
async fn inc_returns_local_estimate() {
let c = make_lax("inc_returns_local_estimate").await;
let k = key("hits");
let (cum1, _) = c.inc(&k, 5).await.unwrap();
let (cum2, _) = c.inc(&k, 3).await.unwrap();
assert_eq!(cum1, 5);
assert_eq!(cum2, 8);
}
#[tokio::test]
async fn get_returns_local_estimate_including_pending_delta() {
let c = make_lax("get_with_pending").await;
let k = key("hits");
c.inc(&k, 10).await.unwrap();
c.inc(&k, 5).await.unwrap();
let (cum, _) = c.get(&k).await.unwrap();
assert_eq!(cum, 15);
}
#[tokio::test]
async fn flush_sends_delta_to_strict() {
let (c1, _c2, prefix) = make_lax_pair("flush_sends_delta").await;
let k = key("hits");
c1.inc(&k, 20).await.unwrap();
sleep(Duration::from_millis(FLUSH_MS * 5)).await;
let reader = make_lax_from_prefix(&prefix).await;
let (cum, _) = reader.get(&k).await.unwrap();
assert_eq!(cum, 20);
}
#[tokio::test]
async fn two_instances_cumulate_after_flush() {
let (c1, c2, prefix) = make_lax_pair("two_instances_cumulate").await;
let k = key("hits");
c1.inc(&k, 10).await.unwrap();
c2.inc(&k, 7).await.unwrap();
sleep(Duration::from_millis(FLUSH_MS * 5)).await;
let reader = make_lax_from_prefix(&prefix).await;
let (cum, _) = reader.get(&k).await.unwrap();
assert_eq!(cum, 17);
}
#[tokio::test]
async fn set_flushes_pending_delta_then_sets() {
let (c1, c2, _prefix) = make_lax_pair("set_flushes_delta").await;
let k = key("hits");
c1.inc(&k, 5).await.unwrap();
c1.set(&k, 100).await.unwrap();
let (cum, _) = c2.get(&k).await.unwrap();
assert_eq!(cum, 100);
}
#[tokio::test]
async fn del_flushes_pending_delta_then_deletes() {
let (c1, c2, _prefix) = make_lax_pair("del_flushes_delta").await;
let k = key("hits");
c1.inc(&k, 15).await.unwrap();
let (old_cum, _) = c1.del(&k).await.unwrap();
assert_eq!(old_cum, 15);
let (cum, _) = c2.get(&k).await.unwrap();
assert_eq!(cum, 0);
}
#[tokio::test]
async fn dead_instance_cleaned_through_lax_wrapper() {
let (c1, _c2, prefix) = make_lax_pair("dead_instance_lax").await;
let k = key("hits");
c1.inc(&k, 30).await.unwrap();
sleep(Duration::from_millis(FLUSH_MS * 5)).await;
sleep(Duration::from_millis(THRESHOLD_MS + 50)).await;
let reader = make_lax_from_prefix(&prefix).await;
let (cum, _) = reader.get(&k).await.unwrap();
assert_eq!(cum, 0, "dead instance contribution should be cleaned up");
}
#[tokio::test]
async fn clear_flushes_and_wipes() {
let (c1, _c2, prefix) = make_lax_pair("clear_flushes_and_wipes").await;
let k = key("hits");
c1.inc(&k, 50).await.unwrap();
c1.clear().await.unwrap();
let reader = make_lax_from_prefix(&prefix).await;
let (cum, _) = reader.get(&k).await.unwrap();
assert_eq!(cum, 0);
}
#[tokio::test]
async fn clear_on_instance_removes_only_this_instance() {
let (c1, c2, prefix) = make_lax_pair("clear_on_instance_lax").await;
let k = key("hits");
c1.inc(&k, 20).await.unwrap();
c2.inc(&k, 10).await.unwrap();
sleep(Duration::from_millis(FLUSH_MS * 5)).await;
c1.clear_on_instance().await.unwrap();
let reader = make_lax_from_prefix(&prefix).await;
let (cum, _) = reader.get(&k).await.unwrap();
assert_eq!(cum, 10, "only c1's contribution should be removed");
}
#[tokio::test]
async fn dec_returns_local_estimate() {
let c = make_lax("dec_returns_local_estimate").await;
let k = key("hits");
c.inc(&k, 10).await.unwrap();
let (cum, instance) = c.dec(&k, 3).await.unwrap();
assert_eq!(cum, 7);
assert_eq!(instance, 7);
}
#[tokio::test]
async fn get_on_unknown_key_returns_zero() {
let c = make_lax("get_unknown_zero").await;
let k = key("ghost");
let (cum, instance) = c.get(&k).await.unwrap();
assert_eq!(cum, 0);
assert_eq!(instance, 0);
}
#[tokio::test]
async fn del_on_instance_removes_only_this_instance_contribution() {
let (c1, c2, prefix) = make_lax_pair("del_on_instance_lax").await;
let k = key("hits");
c1.inc(&k, 20).await.unwrap();
c2.inc(&k, 10).await.unwrap();
sleep(Duration::from_millis(FLUSH_MS * 5)).await;
let (new_cum, removed) = c1.del_on_instance(&k).await.unwrap();
assert_eq!(removed, 20);
assert_eq!(new_cum, 10);
let reader = make_lax_from_prefix(&prefix).await;
let (cum, _) = reader.get(&k).await.unwrap();
assert_eq!(cum, 10);
}
#[tokio::test]
async fn set_on_instance_adjusts_local_count() {
let c = make_lax("set_on_instance_adjusts").await;
let k = key("hits");
c.inc(&k, 10).await.unwrap();
let (cum, instance) = c.set_on_instance(&k, 7).await.unwrap();
assert_eq!(instance, 7);
assert_eq!(cum, 7);
let (get_cum, get_instance) = c.get(&k).await.unwrap();
assert_eq!(get_cum, 7);
assert_eq!(get_instance, 7);
}
#[tokio::test]
async fn clear_on_instance_flushes_pending_delta() {
let (c1, _c2, prefix) = make_lax_pair("clear_on_instance_pending_delta").await;
let k = key("hits");
c1.inc(&k, 50).await.unwrap();
c1.clear_on_instance().await.unwrap();
let reader = make_lax_from_prefix(&prefix).await;
let (cum, _) = reader.get(&k).await.unwrap();
assert_eq!(cum, 0);
}
#[tokio::test]
async fn instances_see_different_cumulatives_after_sequential_flushes() {
let (c1, c2, prefix) = make_lax_pair("stale_cumulative_divergence").await;
let k = key("hits");
c1.inc(&k, 1).await.unwrap();
sleep(Duration::from_millis(FLUSH_MS * 5)).await;
c2.inc(&k, 1).await.unwrap();
c2.inc(&k, 1).await.unwrap();
sleep(Duration::from_millis(FLUSH_MS * 5)).await;
let (c1_cum, c1_inst) = c1.get(&k).await.unwrap();
assert_eq!(c1_cum, 3, "c1 must re-fetch from strict — cache is stale");
assert_eq!(c1_inst, 1, "c1's own instance slice is unchanged");
let (c2_cum, c2_inst) = c2.get(&k).await.unwrap();
assert_eq!(c2_cum, 3, "c2 sees fresh cumulative from its own flush");
assert_eq!(c2_inst, 2);
let reader = make_lax_from_prefix(&prefix).await;
let (reader_cum, reader_inst) = reader.get(&k).await.unwrap();
assert_eq!(reader_cum, 3, "fresh reader sees ground-truth total");
assert_eq!(reader_inst, 0, "fresh reader has no instance contribution");
}
#[tokio::test]
async fn early_flusher_sees_stale_cumulative_from_reversed_flush_order() {
let (c1, c2, prefix) = make_lax_pair("stale_cumulative_reversed").await;
let k = key("hits");
c2.inc(&k, 1).await.unwrap();
c2.inc(&k, 1).await.unwrap();
sleep(Duration::from_millis(FLUSH_MS * 5)).await;
c1.inc(&k, 1).await.unwrap();
sleep(Duration::from_millis(FLUSH_MS * 15)).await;
let (c2_cum, c2_inst) = c2.get(&k).await.unwrap();
assert_eq!(c2_cum, 3, "c2 must re-fetch from strict — cache is stale");
assert_eq!(c2_inst, 2, "c2's own instance slice is unchanged");
let (c1_cum, c1_inst) = c1.get(&k).await.unwrap();
assert_eq!(c1_cum, 3, "c1 sees fresh cumulative from its own flush");
assert_eq!(c1_inst, 1);
let reader = make_lax_from_prefix(&prefix).await;
let (reader_cum, reader_inst) = reader.get(&k).await.unwrap();
assert_eq!(reader_cum, 3, "fresh reader sees ground-truth total");
assert_eq!(reader_inst, 0);
}
#[tokio::test]
async fn get_all_empty_returns_empty() {
let c = make_lax("get_all_empty").await;
assert_eq!(c.get_all(&[]).await.unwrap(), vec![]);
}
#[tokio::test]
async fn get_all_unknown_keys_return_zero_zero() {
let c = make_lax("get_all_unknown").await;
let k1 = key("a");
let k2 = key("b");
assert_eq!(
c.get_all(&[&k1, &k2]).await.unwrap(),
vec![(&k1, 0, 0), (&k2, 0, 0)]
);
}
#[tokio::test]
async fn get_all_returns_correct_values_after_inc() {
let c = make_lax("get_all_after_inc").await;
let k1 = key("a");
let k2 = key("b");
c.inc(&k1, 5).await.unwrap();
c.inc(&k2, 10).await.unwrap();
let results = c.get_all(&[&k1, &k2]).await.unwrap();
assert_eq!(results, vec![(&k1, 5, 5), (&k2, 10, 10)]);
}
#[tokio::test]
async fn get_all_preserves_input_order() {
let c = make_lax("get_all_order").await;
let k1 = key("a");
let k2 = key("b");
let k3 = key("c");
c.inc(&k1, 1).await.unwrap();
c.inc(&k2, 2).await.unwrap();
c.inc(&k3, 3).await.unwrap();
let results = c.get_all(&[&k3, &k1, &k2]).await.unwrap();
assert_eq!(results, vec![(&k3, 3, 3), (&k1, 1, 1), (&k2, 2, 2)]);
}
#[tokio::test]
async fn get_all_fetches_stale_keys_from_redis() {
let (writer, _, prefix) = make_lax_pair("get_all_stale").await;
let k = key("hits");
writer.inc(&k, 42).await.unwrap();
sleep(Duration::from_millis(FLUSH_MS * 5)).await;
let reader = make_lax_from_prefix(&prefix).await;
let results = reader.get_all(&[&k]).await.unwrap();
assert_eq!(results[0].1, 42);
}
#[tokio::test]
async fn get_all_on_instance_empty_returns_empty() {
let c = make_lax("goi_empty").await;
assert_eq!(c.get_all_on_instance(&[]).await.unwrap(), vec![]);
}
#[tokio::test]
async fn get_all_on_instance_unknown_keys_return_zero() {
let c = make_lax("goi_unknown").await;
let k1 = key("a");
let k2 = key("b");
assert_eq!(
c.get_all_on_instance(&[&k1, &k2]).await.unwrap(),
vec![(&k1, 0), (&k2, 0)]
);
}
#[tokio::test]
async fn get_all_on_instance_returns_instance_count_plus_delta() {
let c = make_lax("goi_delta").await;
let k = key("hits");
c.inc(&k, 5).await.unwrap();
assert_eq!(c.get_all_on_instance(&[&k]).await.unwrap(), vec![(&k, 5)]);
}
#[tokio::test]
async fn get_all_on_instance_unaffected_by_other_instances() {
let (c1, c2, _) = make_lax_pair("goi_isolation").await;
let k = key("hits");
c2.inc(&k, 100).await.unwrap();
sleep(Duration::from_millis(FLUSH_MS * 5)).await;
assert_eq!(c1.get_all_on_instance(&[&k]).await.unwrap(), vec![(&k, 0)]);
}
#[tokio::test]
async fn set_all_on_instance_empty_returns_empty() {
let c = make_lax("soi_empty").await;
assert_eq!(c.set_all_on_instance(&[]).await.unwrap(), vec![]);
}
#[tokio::test]
async fn set_all_on_instance_basic_correctness() {
let c = make_lax("soi_basic").await;
let k1 = key("a");
let k2 = key("b");
let results = c.set_all_on_instance(&[(&k1, 7), (&k2, 3)]).await.unwrap();
assert_eq!(results, vec![(&k1, 7, 7), (&k2, 3, 3)]);
}
#[tokio::test]
async fn set_all_on_instance_preserves_other_slices() {
let (c1, c2, prefix) = make_lax_pair("soi_preserves").await;
let k = key("hits");
c2.inc(&k, 10).await.unwrap();
sleep(Duration::from_millis(FLUSH_MS * 10)).await;
c1.set_all_on_instance(&[(&k, 4)]).await.unwrap();
sleep(Duration::from_millis(FLUSH_MS * 10)).await;
let reader = make_lax_from_prefix(&prefix).await;
let (cum, _) = reader.get(&k).await.unwrap();
assert_eq!(cum, 14);
}
#[tokio::test]
async fn set_all_on_instance_stale_keys_are_batch_refreshed() {
let (c1, c2, _) = make_lax_pair("soi_stale_refresh").await;
let k = key("hits");
c2.inc(&k, 5).await.unwrap();
sleep(Duration::from_millis(FLUSH_MS * 5)).await;
let results = c1.set_all_on_instance(&[(&k, 3)]).await.unwrap();
assert_eq!(results[0].2, 3, "instance slice set correctly");
}
#[tokio::test]
async fn set_all_empty_returns_empty() {
let c = make_lax("sa_empty").await;
assert_eq!(c.set_all(&[]).await.unwrap(), vec![]);
}
#[tokio::test]
async fn set_all_basic_correctness() {
let c = make_lax("sa_basic").await;
let k1 = key("a");
let k2 = key("b");
let results = c.set_all(&[(&k1, 10), (&k2, 20)]).await.unwrap();
assert_eq!(results, vec![(&k1, 10, 10), (&k2, 20, 20)]);
}
#[tokio::test]
async fn set_all_visible_to_other_instances() {
let (c1, c2, _) = make_lax_pair("sa_visible").await;
let k = key("hits");
c1.set_all(&[(&k, 99)]).await.unwrap();
let (cum, _) = c2.get(&k).await.unwrap();
assert_eq!(cum, 99);
}
#[tokio::test]
async fn set_all_flushes_pending_delta_first() {
let (c1, c2, _) = make_lax_pair("sa_flush_first").await;
let k = key("hits");
c1.inc(&k, 5).await.unwrap();
c1.set_all(&[(&k, 20)]).await.unwrap();
let (cum, _) = c2.get(&k).await.unwrap();
assert_eq!(cum, 20);
}
#[tokio::test]
async fn inc_if_uses_all_comparators_against_local_view() {
let cases = [
("eq", CounterComparator::Eq(10), true),
("lt", CounterComparator::Lt(11), true),
("gt", CounterComparator::Gt(10), false),
("ne", CounterComparator::Ne(9), true),
("nil", CounterComparator::Nil, true),
];
for (suffix, comparator, should_apply) in cases {
let c = make_lax(&format!("lax_inc_if_{suffix}")).await;
let k = key("hits");
c.set(&k, 10).await.unwrap();
let (cum, inst) = c.inc_if(&k, comparator, 2).await.unwrap();
let expected = if should_apply { (12, 12) } else { (10, 10) };
assert_eq!((cum, inst), expected);
assert_eq!(c.get(&k).await.unwrap(), expected);
}
}
#[tokio::test]
async fn set_if_success_is_visible_to_other_instances_immediately() {
let (c1, c2, _) = make_lax_pair("lax_set_if_visible").await;
let k = key("hits");
c1.inc(&k, 5).await.unwrap();
let result = c1.set_if(&k, CounterComparator::Eq(5), 20).await.unwrap();
assert_eq!(result, (20, 20));
assert_eq!(c2.get(&k).await.unwrap().0, 20);
}
#[tokio::test]
async fn inc_all_empty_and_inc_all_if_empty_return_empty() {
let c = make_lax("lax_inc_all_empty").await;
assert_eq!(c.inc_all(&[]).await.unwrap(), vec![]);
assert_eq!(c.inc_all_if(&[]).await.unwrap(), vec![]);
}
#[tokio::test]
async fn inc_all_updates_local_view_immediately_and_supports_duplicates() {
let c = make_lax("lax_inc_all_duplicates").await;
let k = key("hits");
let results = c.inc_all(&[(&k, 1), (&k, 2)]).await.unwrap();
assert_eq!(results, vec![(&k, 1, 1), (&k, 3, 3)]);
assert_eq!(c.get(&k).await.unwrap(), (3, 3));
}
#[tokio::test]
async fn inc_all_if_uses_stale_aware_local_cumulative_and_is_sequential() {
let (c1, c2, _) = make_lax_pair("lax_inc_all_if_ordered").await;
let k = key("hits");
c2.inc(&k, 5).await.unwrap();
sleep(Duration::from_millis(FLUSH_MS * 5)).await;
let results = c1
.inc_all_if(&[
(&k, CounterComparator::Eq(5), 1),
(&k, CounterComparator::Eq(6), 2),
(&k, CounterComparator::Gt(20), 4),
])
.await
.unwrap();
assert_eq!(results, vec![(&k, 6, 1), (&k, 8, 3), (&k, 8, 3)]);
assert_eq!(c1.get(&k).await.unwrap(), (8, 3));
}
#[tokio::test]
async fn inc_all_if_successes_are_eventually_visible_after_flush() {
let (c1, c2, _) = make_lax_pair("lax_inc_all_if_flush").await;
let k1 = key("a");
let k2 = key("b");
let results = c1
.inc_all_if(&[
(&k1, CounterComparator::Nil, 4),
(&k2, CounterComparator::Gt(0), 7),
])
.await
.unwrap();
assert_eq!(results, vec![(&k1, 4, 4), (&k2, 0, 0)]);
sleep(Duration::from_millis(FLUSH_MS * 5)).await;
assert_eq!(c2.get(&k1).await.unwrap().0, 4);
assert_eq!(c2.get(&k2).await.unwrap().0, 0);
}
#[tokio::test]
async fn set_all_if_supports_partial_success() {
let (c1, c2, _) = make_lax_pair("lax_set_all_if_partial").await;
let k1 = key("a");
let k2 = key("b");
c1.set_all(&[(&k1, 10), (&k2, 20)]).await.unwrap();
let results = c1
.set_all_if(&[
(&k1, CounterComparator::Nil, 11),
(&k2, CounterComparator::Lt(10), 99),
])
.await
.unwrap();
assert_eq!(results, vec![(&k1, 11, 11), (&k2, 20, 20)]);
assert_eq!(c2.get(&k1).await.unwrap().0, 11);
assert_eq!(c2.get(&k2).await.unwrap().0, 20);
}
#[tokio::test]
async fn set_all_on_instance_if_refreshes_stale_state_before_comparing() {
let (c1, c2, _) = make_lax_pair("lax_set_all_on_instance_if_refresh").await;
let k = key("hits");
c2.inc(&k, 5).await.unwrap();
sleep(Duration::from_millis(FLUSH_MS * 5)).await;
let results = c1
.set_all_on_instance_if(&[(&k, CounterComparator::Eq(0), 3)])
.await
.unwrap();
assert_eq!(results, vec![(&k, 8, 3)]);
assert_eq!(c1.get(&k).await.unwrap(), (8, 3));
}