use std::time::Duration;
use crate::CounterComparator;
use crate::icounter::InstanceAwareCounterTrait;
use super::common::{
key, make_counter, make_n_counters, make_n_counters_with_opts, make_pair, make_pair_with_opts,
};
#[tokio::test]
async fn inc_returns_cumulative() {
let c = make_counter("inc_returns_cumulative").await;
let k = key("x");
assert_eq!(c.inc(&k, 5).await.unwrap().0, 5);
assert_eq!(c.inc(&k, 3).await.unwrap().0, 8);
c.clear().await.unwrap();
}
#[tokio::test]
async fn inc_by_zero_is_noop() {
let c = make_counter("inc_by_zero").await;
let k = key("x");
c.inc(&k, 10).await.unwrap();
assert_eq!(c.inc(&k, 0).await.unwrap().0, 10);
c.clear().await.unwrap();
}
#[tokio::test]
async fn set_returns_count_as_cumulative() {
let c = make_counter("set_returns_count").await;
let k = key("x");
assert_eq!(c.set(&k, 99).await.unwrap().0, 99);
c.clear().await.unwrap();
}
#[tokio::test]
async fn set_on_instance_new_key() {
let c = make_counter("set_on_instance_new").await;
let k = key("x");
let (cum, inst) = c.set_on_instance(&k, 42).await.unwrap();
assert_eq!(cum, 42);
assert_eq!(inst, 42);
c.clear().await.unwrap();
}
#[tokio::test]
async fn get_returns_zero_for_unknown_key() {
let c = make_counter("get_zero_unknown").await;
let (cum, inst) = c.get(&key("never_set")).await.unwrap();
assert_eq!(cum, 0);
assert_eq!(inst, 0);
c.clear().await.unwrap();
}
#[tokio::test]
async fn get_matches_cumulative_and_instance_count() {
let c = make_counter("get_matches").await;
let k = key("x");
c.inc(&k, 7).await.unwrap();
let (cum, inst) = c.get(&k).await.unwrap();
assert_eq!(cum, 7);
assert_eq!(inst, 7);
c.clear().await.unwrap();
}
#[tokio::test]
async fn del_returns_old_cumulative() {
let c = make_counter("del_returns_old").await;
let k = key("x");
c.inc(&k, 42).await.unwrap();
assert_eq!(c.del(&k).await.unwrap().0, 42);
c.clear().await.unwrap();
}
#[tokio::test]
async fn del_resets_key() {
let c = make_counter("del_resets").await;
let k = key("x");
c.inc(&k, 10).await.unwrap();
c.del(&k).await.unwrap();
let (cum, inst) = c.get(&k).await.unwrap();
assert_eq!(cum, 0);
assert_eq!(inst, 0);
c.clear().await.unwrap();
}
#[tokio::test]
async fn del_on_instance_removes_contribution() {
let c = make_counter("del_on_inst").await;
let k = key("x");
c.inc(&k, 15).await.unwrap();
let (new_cum, removed) = c.del_on_instance(&k).await.unwrap();
assert_eq!(removed, 15);
assert_eq!(new_cum, 0);
c.clear().await.unwrap();
}
#[tokio::test]
async fn del_on_instance_on_unknown_key_returns_zeros() {
let c = make_counter("del_on_inst_unknown").await;
let (cum, removed) = c.del_on_instance(&key("nonexistent")).await.unwrap();
assert_eq!(cum, 0);
assert_eq!(removed, 0);
c.clear().await.unwrap();
}
#[tokio::test]
async fn clear_wipes_all_keys() {
let c = make_counter("clear_wipes").await;
let (k1, k2) = (key("a"), key("b"));
c.inc(&k1, 5).await.unwrap();
c.inc(&k2, 10).await.unwrap();
c.clear().await.unwrap();
assert_eq!(c.get(&k1).await.unwrap().0, 0);
assert_eq!(c.get(&k2).await.unwrap().0, 0);
}
#[tokio::test]
async fn clear_on_instance_removes_contribution() {
let c = make_counter("clear_on_inst").await;
let k = key("x");
c.inc(&k, 20).await.unwrap();
c.clear_on_instance().await.unwrap();
let (cum, inst) = c.get(&k).await.unwrap();
assert_eq!(cum, 0);
assert_eq!(inst, 0);
c.clear().await.unwrap();
}
#[tokio::test]
async fn independent_keys_do_not_interfere() {
let c = make_counter("independent_keys").await;
let (k1, k2) = (key("a"), key("b"));
c.inc(&k1, 5).await.unwrap();
c.inc(&k2, 10).await.unwrap();
assert_eq!(c.get(&k1).await.unwrap().0, 5);
assert_eq!(c.get(&k2).await.unwrap().0, 10);
c.clear().await.unwrap();
}
#[tokio::test]
async fn prefix_isolation() {
let c1 = make_counter("prefix_iso_a").await;
let c2 = make_counter("prefix_iso_b").await;
let k = key("x");
c1.inc(&k, 100).await.unwrap();
assert_eq!(c2.get(&k).await.unwrap().0, 0);
c1.clear().await.unwrap();
c2.clear().await.unwrap();
}
#[tokio::test]
async fn three_instances_cumulate_via_inc() {
let cs = make_n_counters("three_inc", 3).await;
let k = key("x");
cs[0].inc(&k, 10).await.unwrap();
cs[1].inc(&k, 20).await.unwrap();
let (cum, _) = cs[2].inc(&k, 30).await.unwrap();
assert_eq!(cum, 60);
cs[0].clear().await.unwrap();
}
#[tokio::test]
async fn five_instances_set_on_instance_sum() {
let cs = make_n_counters("five_set_on_inst", 5).await;
let k = key("x");
for (i, c) in cs.iter().enumerate() {
c.set_on_instance(&k, (i + 1) as i64).await.unwrap();
}
let (cum, _) = cs[0].get(&k).await.unwrap();
assert_eq!(cum, 15);
for (i, c) in cs.iter().enumerate() {
let (_, inst) = c.get(&k).await.unwrap();
assert_eq!(inst, (i + 1) as i64);
}
cs[0].clear().await.unwrap();
}
#[tokio::test]
async fn ten_instances_each_inc_by_one() {
let cs = make_n_counters("ten_inc_one", 10).await;
let k = key("x");
for c in &cs {
c.inc(&k, 1).await.unwrap();
}
let (cum, _) = cs[0].get(&k).await.unwrap();
assert_eq!(cum, 10);
cs[0].clear().await.unwrap();
}
#[tokio::test]
async fn n_instances_del_on_instance_progressively_reaches_zero() {
let cs = make_n_counters("del_on_inst_progress", 3).await;
let k = key("x");
let amounts = [10i64, 20, 30];
for (c, &amt) in cs.iter().zip(amounts.iter()) {
c.inc(&k, amt).await.unwrap();
}
let expected_after = [50i64, 30, 0]; for (c, &expected) in cs.iter().zip(expected_after.iter()) {
let (new_cum, _) = c.del_on_instance(&k).await.unwrap();
assert_eq!(new_cum, expected);
}
cs[0].clear().await.unwrap();
}
#[tokio::test]
async fn n_instances_clear_on_instance_one_at_a_time() {
let cs = make_n_counters("clear_on_inst_seq", 3).await;
let k = key("x");
let amounts = [10i64, 20, 30];
for (c, &amt) in cs.iter().zip(amounts.iter()) {
c.inc(&k, amt).await.unwrap();
}
let expected_after = [50i64, 30, 0];
for (c, &expected) in cs.iter().zip(expected_after.iter()) {
c.clear_on_instance().await.unwrap();
let (cum, _) = cs[2].get(&k).await.unwrap(); assert_eq!(cum, expected);
}
cs[0].clear().await.unwrap();
}
#[tokio::test]
async fn set_by_one_instance_makes_all_others_stale() {
let cs = make_n_counters("set_makes_stale", 3).await;
let k = key("x");
cs[0].set_on_instance(&k, 100).await.unwrap(); cs[1].set_on_instance(&k, 100).await.unwrap(); cs[2].set_on_instance(&k, 100).await.unwrap();
assert_eq!(cs[0].set(&k, 50).await.unwrap().0, 50);
assert_eq!(cs[1].inc(&k, 5).await.unwrap().0, 55);
assert_eq!(cs[2].inc(&k, 5).await.unwrap().0, 60);
cs[0].clear().await.unwrap();
}
#[tokio::test]
async fn del_by_one_instance_makes_all_others_stale() {
let cs = make_n_counters("del_makes_stale", 3).await;
let k = key("x");
cs[0].inc(&k, 100).await.unwrap();
cs[1].inc(&k, 100).await.unwrap();
cs[2].inc(&k, 100).await.unwrap();
cs[0].del(&k).await.unwrap();
cs[1].inc(&k, 10).await.unwrap();
let (cum, _) = cs[2].inc(&k, 10).await.unwrap();
assert_eq!(cum, 20);
cs[0].clear().await.unwrap();
}
#[tokio::test]
async fn sequential_epoch_bumps_from_different_instances() {
let cs = make_n_counters("seq_epoch_bumps", 3).await;
let k = key("x");
assert_eq!(cs[0].set(&k, 10).await.unwrap().0, 10);
assert_eq!(cs[1].set(&k, 20).await.unwrap().0, 20);
assert_eq!(cs[2].set(&k, 30).await.unwrap().0, 30);
let (cum, _) = cs[0].get(&k).await.unwrap();
assert_eq!(cum, 30);
cs[0].clear().await.unwrap();
}
#[tokio::test]
async fn instance_missed_multiple_epochs_still_resyncs() {
let (c1, c2) = make_pair("missed_epochs").await;
let k = key("x");
c2.inc(&k, 50).await.unwrap();
c1.set(&k, 10).await.unwrap(); c1.set(&k, 20).await.unwrap(); c1.set(&k, 30).await.unwrap();
let (cum, _) = c2.inc(&k, 5).await.unwrap();
assert_eq!(cum, 35);
c1.clear().await.unwrap();
}
#[tokio::test]
async fn brand_new_instance_after_epoch_bump_starts_fresh() {
let cs = make_n_counters("new_inst_after_bump", 2).await;
let k = key("x");
assert_eq!(cs[0].set(&k, 100).await.unwrap().0, 100);
let (cum, inst) = cs[1].set_on_instance(&k, 50).await.unwrap();
assert_eq!(inst, 50);
assert_eq!(cum, 150);
cs[0].clear().await.unwrap();
}
#[tokio::test]
async fn interleaved_inc_and_set_across_three_instances() {
let cs = make_n_counters("interleaved", 3).await;
let k = key("x");
cs[0].inc(&k, 10).await.unwrap(); cs[1].inc(&k, 5).await.unwrap();
assert_eq!(cs[0].set(&k, 20).await.unwrap().0, 20);
assert_eq!(cs[1].inc(&k, 3).await.unwrap().0, 23);
assert_eq!(cs[2].inc(&k, 7).await.unwrap().0, 30);
cs[0].clear().await.unwrap();
}
#[tokio::test]
async fn all_five_instances_del_on_instance_reaches_zero() {
let cs = make_n_counters("all_del_on_inst", 5).await;
let k = key("x");
let amounts = [5i64, 10, 15, 20, 25];
for (c, &amt) in cs.iter().zip(amounts.iter()) {
c.inc(&k, amt).await.unwrap();
}
for c in &cs {
c.del_on_instance(&k).await.unwrap();
}
let (cum, _) = cs[0].get(&k).await.unwrap();
assert_eq!(cum, 0);
cs[0].clear().await.unwrap();
}
#[tokio::test]
async fn set_then_all_stale_instances_inc() {
let cs = make_n_counters("set_then_stale_inc", 3).await;
let k = key("x");
cs[1].set_on_instance(&k, 200).await.unwrap();
cs[2].set_on_instance(&k, 300).await.unwrap();
assert_eq!(cs[0].set(&k, 100).await.unwrap().0, 100);
cs[1].inc(&k, 10).await.unwrap();
let (cum, _) = cs[2].inc(&k, 10).await.unwrap();
assert_eq!(cum, 120);
let (_, inst1) = cs[1].get(&k).await.unwrap();
let (_, inst2) = cs[2].get(&k).await.unwrap();
assert_eq!(inst1, 10);
assert_eq!(inst2, 10);
cs[0].clear().await.unwrap();
}
#[tokio::test]
async fn del_then_fresh_inc_from_all() {
let cs = make_n_counters("del_fresh_inc_all", 3).await;
let k = key("x");
cs[0].inc(&k, 50).await.unwrap(); cs[1].inc(&k, 30).await.unwrap();
cs[2].del(&k).await.unwrap();
cs[0].inc(&k, 5).await.unwrap();
cs[1].inc(&k, 7).await.unwrap();
let (cum, _) = cs[2].inc(&k, 3).await.unwrap();
assert_eq!(cum, 15);
cs[0].clear().await.unwrap();
}
#[tokio::test]
async fn round_robin_inc_across_five_instances() {
let cs = make_n_counters("round_robin", 5).await;
let k = key("x");
for _ in 0..10 {
for c in &cs {
c.inc(&k, 1).await.unwrap();
}
}
let (cum, _) = cs[0].get(&k).await.unwrap();
assert_eq!(cum, 50);
for c in &cs {
let (_, inst) = c.get(&k).await.unwrap();
assert_eq!(inst, 10);
}
cs[0].clear().await.unwrap();
}
#[tokio::test]
async fn multiple_dead_instances_all_cleaned_up() {
let mut cs = make_n_counters_with_opts("multi_dead", 5, 100).await;
let k = key("x");
for c in &cs {
c.inc(&k, 10).await.unwrap(); }
let survivor = cs.pop().unwrap();
drop(cs);
tokio::time::sleep(Duration::from_millis(200)).await;
let (cum, inst) = survivor.get(&k).await.unwrap();
assert_eq!(inst, 10);
assert_eq!(cum, 10);
survivor.clear().await.unwrap();
}
#[tokio::test]
async fn dead_instance_cleaned_then_new_instance_joins() {
let (c1, c2) = make_pair_with_opts("dead_then_new", 100).await;
let k = key("x");
c1.inc(&k, 40).await.unwrap(); c2.inc(&k, 60).await.unwrap();
drop(c1);
tokio::time::sleep(Duration::from_millis(200)).await;
let (cum, _) = c2.get(&k).await.unwrap();
assert_eq!(cum, 60);
let mut opts = super::common::make_n_counters_with_opts("dead_then_new", 1, 100).await;
let c3 = opts.pop().unwrap();
let (cum, _) = c3.inc(&k, 20).await.unwrap();
assert_eq!(cum, 80);
c2.clear().await.unwrap();
}
#[tokio::test]
async fn dead_instance_with_multiple_keys_fully_cleaned() {
let (c1, c2) = make_pair_with_opts("dead_multi_key", 100).await;
let (k1, k2, k3) = (key("a"), key("b"), key("c"));
c1.inc(&k1, 10).await.unwrap();
c1.inc(&k2, 20).await.unwrap();
c1.inc(&k3, 30).await.unwrap();
c2.get(&k1).await.unwrap();
drop(c1);
tokio::time::sleep(Duration::from_millis(200)).await;
assert_eq!(c2.get(&k1).await.unwrap().0, 0);
assert_eq!(c2.get(&k2).await.unwrap().0, 0);
assert_eq!(c2.get(&k3).await.unwrap().0, 0);
c2.clear().await.unwrap();
}
#[tokio::test]
async fn live_instance_not_cleaned_at_boundary() {
let (c1, c2) = make_pair_with_opts("live_boundary", 500).await;
let k = key("x");
c1.inc(&k, 40).await.unwrap();
c2.inc(&k, 60).await.unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;
let (cum, _) = c2.get(&k).await.unwrap();
assert_eq!(cum, 100);
c1.clear().await.unwrap();
}
#[tokio::test]
async fn three_instances_two_keys_independent() {
let cs = make_n_counters("three_two_keys", 3).await;
let (ka, kb) = (key("a"), key("b"));
cs[0].inc(&ka, 1).await.unwrap();
cs[1].inc(&ka, 2).await.unwrap();
cs[2].inc(&ka, 3).await.unwrap();
cs[0].inc(&kb, 10).await.unwrap();
cs[1].inc(&kb, 20).await.unwrap();
cs[2].inc(&kb, 30).await.unwrap();
let (cum_a, _) = cs[0].get(&ka).await.unwrap();
let (cum_b, _) = cs[0].get(&kb).await.unwrap();
assert_eq!(cum_a, 6);
assert_eq!(cum_b, 60);
cs[0].clear().await.unwrap();
}
#[tokio::test]
async fn epoch_bump_on_one_key_does_not_affect_other() {
let cs = make_n_counters("epoch_key_isolation", 3).await;
let (k1, k2) = (key("k1"), key("k2"));
for c in &cs {
c.inc(&k1, 10).await.unwrap();
c.inc(&k2, 10).await.unwrap();
}
cs[0].set(&k1, 5).await.unwrap();
let (cum_k2_after_c1, _) = cs[1].inc(&k2, 1).await.unwrap();
let (cum_k2_after_c2, _) = cs[2].inc(&k2, 1).await.unwrap();
assert_eq!(cum_k2_after_c1, 31);
assert_eq!(cum_k2_after_c2, 32);
let (cum_k1, _) = cs[1].inc(&k1, 7).await.unwrap();
assert_eq!(cum_k1, 12);
cs[0].clear().await.unwrap();
}
#[tokio::test]
async fn clear_on_instance_removes_all_keys_for_that_instance() {
let (c1, c2) = make_pair("clear_on_inst_multi_key").await;
let (ka, kb, kc) = (key("a"), key("b"), key("c"));
c1.inc(&ka, 10).await.unwrap();
c1.inc(&kb, 20).await.unwrap();
c1.inc(&kc, 30).await.unwrap();
c2.inc(&ka, 5).await.unwrap();
c2.inc(&kb, 5).await.unwrap();
c2.inc(&kc, 5).await.unwrap();
c1.clear_on_instance().await.unwrap();
assert_eq!(c2.get(&ka).await.unwrap().0, 5);
assert_eq!(c2.get(&kb).await.unwrap().0, 5);
assert_eq!(c2.get(&kc).await.unwrap().0, 5);
c1.clear().await.unwrap();
}
#[tokio::test]
async fn stale_del_on_instance_does_not_double_subtract() {
let (c1, c2) = make_pair("stale_del_on_inst").await;
let k = key("x");
c1.inc(&k, 40).await.unwrap(); c2.inc(&k, 60).await.unwrap();
c1.set(&k, 10).await.unwrap();
let (new_cum, _removed) = c2.del_on_instance(&k).await.unwrap();
assert_eq!(new_cum, 10);
c1.clear().await.unwrap();
}
#[tokio::test]
async fn set_on_instance_from_stale_uses_full_count_as_delta() {
let (c1, c2) = make_pair("stale_set_on_inst_delta").await;
let k = key("x");
c2.set_on_instance(&k, 50).await.unwrap();
c1.set(&k, 100).await.unwrap();
let (cum, inst) = c2.set_on_instance(&k, 30).await.unwrap();
assert_eq!(inst, 30);
assert_eq!(cum, 130);
c1.clear().await.unwrap();
}
#[tokio::test]
async fn inc_stale_ignores_old_instance_count_in_cumulative() {
let (c1, c2) = make_pair("stale_inc_ignore_old").await;
let k = key("x");
c2.set_on_instance(&k, 100).await.unwrap();
c1.set(&k, 5).await.unwrap();
let (cum, _) = c2.inc(&k, 3).await.unwrap();
assert_eq!(cum, 8);
c1.clear().await.unwrap();
}
#[tokio::test]
async fn two_instances_cumulate_via_inc() {
let (c1, c2) = make_pair("two_inc").await;
let k = key("x");
c1.inc(&k, 10).await.unwrap();
assert_eq!(c2.inc(&k, 20).await.unwrap().0, 30);
c1.clear().await.unwrap();
}
#[tokio::test]
async fn two_instances_cumulate_via_set_on_instance() {
let (c1, c2) = make_pair("two_set_on_inst").await;
let k = key("x");
assert_eq!(c1.set_on_instance(&k, 5).await.unwrap().0, 5);
assert_eq!(c2.set_on_instance(&k, 10).await.unwrap().0, 15);
c1.clear().await.unwrap();
}
#[tokio::test]
async fn del_on_instance_only_removes_caller_contribution() {
let (c1, c2) = make_pair("del_on_inst_caller").await;
let k = key("x");
c1.inc(&k, 30).await.unwrap();
c2.inc(&k, 50).await.unwrap();
let (new_cum, removed) = c1.del_on_instance(&k).await.unwrap();
assert_eq!(removed, 30);
assert_eq!(new_cum, 50);
c1.clear().await.unwrap();
}
#[tokio::test]
async fn stale_instance_resets_to_delta_after_epoch_bump() {
let (c1, c2) = make_pair("stale_epoch").await;
let k = key("x");
c1.set_on_instance(&k, 5).await.unwrap(); c2.set_on_instance(&k, 10).await.unwrap();
assert_eq!(c1.set(&k, 3).await.unwrap().0, 3);
assert_eq!(c2.inc(&k, 5).await.unwrap().0, 8);
c1.clear().await.unwrap();
}
#[tokio::test]
async fn set_increments_epoch_on_each_call() {
let c = make_counter("epoch_monotonic").await;
let k = key("x");
c.set(&k, 10).await.unwrap();
assert_eq!(c.set(&k, 20).await.unwrap().0, 20);
assert_eq!(c.get(&k).await.unwrap().0, 20);
c.clear().await.unwrap();
}
#[tokio::test]
async fn dead_instance_counts_removed_from_cumulative() {
let (c1, c2) = make_pair_with_opts("dead_test", 100).await;
let k = key("hits");
c1.inc(&k, 40).await.unwrap();
c2.inc(&k, 60).await.unwrap();
tokio::time::sleep(Duration::from_millis(200)).await;
drop(c1);
let (cum, _) = c2.get(&k).await.unwrap();
assert_eq!(cum, 60);
c2.clear().await.unwrap();
}
#[tokio::test]
async fn recovery_via_inc_after_downtime() {
let (c1, c2) = make_pair_with_opts("recovery_inc", 100).await;
let k = key("x");
c1.inc(&k, 20).await.unwrap();
tokio::time::sleep(Duration::from_millis(200)).await;
let (cum_after_cleanup, _) = c2.get(&k).await.unwrap();
assert_eq!(cum_after_cleanup, 0);
let (cum, _) = c1.inc(&k, 5).await.unwrap();
assert_eq!(cum, 25);
let (total, c1_inst) = c1.get(&k).await.unwrap();
assert_eq!(total, 25);
assert_eq!(c1_inst, 25);
c1.clear().await.unwrap();
}
#[tokio::test]
async fn recovery_via_get_after_downtime() {
let (c1, c2) = make_pair_with_opts("recovery_get", 100).await;
let k = key("x");
c1.inc(&k, 30).await.unwrap();
tokio::time::sleep(Duration::from_millis(200)).await;
let (cum_after_cleanup, _) = c2.get(&k).await.unwrap();
assert_eq!(cum_after_cleanup, 0);
let (cum, inst) = c1.get(&k).await.unwrap();
assert_eq!(cum, 30);
assert_eq!(inst, 30);
c1.clear().await.unwrap();
}
#[tokio::test]
async fn no_recovery_when_epoch_bumped_during_downtime() {
let (c1, c2) = make_pair_with_opts("no_recovery_epoch", 100).await;
let k = key("x");
c1.inc(&k, 20).await.unwrap();
tokio::time::sleep(Duration::from_millis(200)).await;
let (cum, _) = c2.set(&k, 100).await.unwrap();
assert_eq!(cum, 100);
let (cum, _) = c1.inc(&k, 5).await.unwrap();
assert_eq!(cum, 105);
let (_, c1_inst) = c1.get(&k).await.unwrap();
assert_eq!(c1_inst, 5);
c1.clear().await.unwrap();
}
#[tokio::test]
async fn mark_alive_recovery_restores_contribution() {
let (c1, c2) = make_pair_with_opts("mark_alive_recovery", 100).await;
let k = key("x");
c1.inc(&k, 20).await.unwrap();
tokio::time::sleep(Duration::from_millis(200)).await;
let (cum_after_cleanup, _) = c2.get(&k).await.unwrap();
assert_eq!(cum_after_cleanup, 0);
c1.trigger_mark_alive().await.unwrap();
let (cum, _) = c2.get(&k).await.unwrap();
assert_eq!(cum, 20);
c1.clear().await.unwrap();
}
#[tokio::test]
async fn mark_alive_no_recovery_when_epoch_bumped() {
let (c1, c2) = make_pair_with_opts("mark_alive_no_recovery_epoch", 100).await;
let k = key("x");
c1.inc(&k, 20).await.unwrap();
tokio::time::sleep(Duration::from_millis(200)).await;
let (cum, _) = c2.set(&k, 100).await.unwrap();
assert_eq!(cum, 100);
c1.trigger_mark_alive().await.unwrap();
let (cum, _) = c2.get(&k).await.unwrap();
assert_eq!(cum, 100);
c2.clear().await.unwrap();
}
#[tokio::test]
async fn no_recovery_for_live_instance() {
let (c1, c2) = make_pair_with_opts("no_recovery_live", 1_000).await;
let k = key("x");
c1.inc(&k, 20).await.unwrap(); c1.inc(&k, 5).await.unwrap();
let (cum, c1_inst) = c2.get(&k).await.unwrap();
assert_eq!(cum, 25);
assert_eq!(c1_inst, 0);
let (_, c1_from_c1) = c1.get(&k).await.unwrap();
assert_eq!(c1_from_c1, 25);
c1.clear().await.unwrap();
}
#[tokio::test]
async fn inc_if_uses_all_comparators_against_cumulative() {
let cases = [
("eq", CounterComparator::Eq(10), true),
("lt", CounterComparator::Lt(11), true),
("gt", CounterComparator::Gt(10), false),
("ne", CounterComparator::Ne(9), true),
("nil", CounterComparator::Nil, true),
];
for (suffix, comparator, should_apply) in cases {
let c = make_counter(&format!("strict_inc_if_{suffix}")).await;
let k = key("hits");
c.set(&k, 10).await.unwrap();
let result = c.inc_if(&k, comparator, 2).await.unwrap();
let expected = if should_apply {
((12, 12), (10, 10))
} else {
((10, 10), (10, 10))
};
assert_eq!(result, expected);
assert_eq!(c.get(&k).await.unwrap(), expected.0);
c.clear().await.unwrap();
}
}
#[tokio::test]
async fn inc_all_empty_and_inc_all_if_empty_return_empty() {
let c = make_counter("strict_inc_all_empty").await;
assert_eq!(c.inc_all(&[]).await.unwrap(), vec![]);
assert_eq!(c.inc_all_if(&[]).await.unwrap(), vec![]);
c.clear().await.unwrap();
}
#[tokio::test]
async fn inc_all_returns_ordered_results_and_supports_duplicates() {
let c = make_counter("strict_inc_all_duplicates").await;
let k = key("hits");
let results = c.inc_all(&[(&k, 1), (&k, 2)]).await.unwrap();
assert_eq!(results, vec![(&k, 1, 1), (&k, 3, 3)]);
assert_eq!(c.get(&k).await.unwrap(), (3, 3));
c.clear().await.unwrap();
}
#[tokio::test]
async fn inc_all_if_compares_against_cumulative_and_processes_duplicates_sequentially() {
let (c1, c2) = make_pair("strict_inc_all_if_ordered").await;
let k1 = key("a");
let k2 = key("b");
c2.inc(&k2, 5).await.unwrap();
c1.set(&k1, 0).await.unwrap();
let results = c1
.inc_all_if(&[
(&k1, CounterComparator::Eq(0), 1),
(&k1, CounterComparator::Eq(1), 2),
(&k2, CounterComparator::Gt(10), 4),
(&k2, CounterComparator::Eq(5), 3),
])
.await
.unwrap();
assert_eq!(
results,
vec![
(&k1, (1, 1), (0, 0)),
(&k1, (3, 3), (1, 1)),
(&k2, (5, 0), (5, 0)),
(&k2, (8, 3), (5, 0)),
]
);
assert_eq!(c1.get(&k1).await.unwrap(), (3, 3));
assert_eq!(c1.get(&k2).await.unwrap(), (8, 3));
assert_eq!(c2.get(&k2).await.unwrap(), (8, 5));
c1.clear().await.unwrap();
}
#[tokio::test]
async fn set_on_instance_if_compares_against_instance_slice() {
let (c1, c2) = make_pair("strict_set_on_instance_if").await;
let k = key("hits");
c1.set_on_instance(&k, 7).await.unwrap();
c2.set_on_instance(&k, 5).await.unwrap();
let result = c1
.set_on_instance_if(&k, CounterComparator::Gt(6), 9)
.await
.unwrap();
assert_eq!(result, ((14, 9), (12, 7)));
let failed = c1
.set_on_instance_if(&k, CounterComparator::Eq(8), 50)
.await
.unwrap();
assert_eq!(failed, ((14, 9), (14, 9)));
let unconditional = c1
.set_on_instance_if(&k, CounterComparator::Nil, 11)
.await
.unwrap();
assert_eq!(unconditional, ((16, 11), (14, 9)));
c1.clear().await.unwrap();
}
#[tokio::test]
async fn set_all_if_supports_partial_success_and_missing_keys() {
let c = make_counter("strict_set_all_if_partial").await;
let k1 = key("a");
let k2 = key("b");
let k3 = key("c");
c.set(&k1, 10).await.unwrap();
c.set(&k2, 20).await.unwrap();
let results = c
.set_all_if(&[
(&k3, CounterComparator::Nil, 30),
(&k1, CounterComparator::Gt(5), 11),
(&k2, CounterComparator::Lt(10), 99),
])
.await
.unwrap();
assert_eq!(
results,
vec![
(&k3, (30, 30), (0, 0)),
(&k1, (11, 11), (10, 10)),
(&k2, (20, 20), (20, 20))
]
);
assert_eq!(c.get(&k1).await.unwrap(), (11, 11));
assert_eq!(c.get(&k2).await.unwrap(), (20, 20));
assert_eq!(c.get(&k3).await.unwrap(), (30, 30));
c.clear().await.unwrap();
}
#[tokio::test]
async fn set_all_on_instance_if_supports_partial_success() {
let (c1, c2) = make_pair("strict_set_all_on_instance_if").await;
let k1 = key("a");
let k2 = key("b");
c1.set_on_instance(&k1, 4).await.unwrap();
c2.set_on_instance(&k1, 5).await.unwrap();
c2.set_on_instance(&k2, 3).await.unwrap();
let results = c1
.set_all_on_instance_if(&[
(&k2, CounterComparator::Eq(1), 10),
(&k1, CounterComparator::Nil, 7),
])
.await
.unwrap();
assert_eq!(results, vec![(&k2, (3, 0), (3, 0)), (&k1, (12, 7), (9, 4))]);
assert_eq!(c2.get(&k1).await.unwrap().0, 12);
assert_eq!(c2.get(&k2).await.unwrap(), (3, 3));
c1.clear().await.unwrap();
}