use std::sync::Arc;
use std::time::Duration;
use tokio::time::sleep;
use crate::icounter::{
InstanceAwareCounterTrait, LaxInstanceAwareCounter, LaxInstanceAwareCounterOptions,
};
use super::common::{make_connection, run_id};
use crate::RedisKey;
const FLUSH_MS: u64 = 10;
const THRESHOLD_MS: u64 = 300;
fn unique_prefix(name: &str) -> String {
format!("{}_{}", run_id(), name)
}
async fn make_lax_from_prefix(prefix: &str) -> Arc<LaxInstanceAwareCounter> {
let conn = make_connection().await;
LaxInstanceAwareCounter::new(LaxInstanceAwareCounterOptions {
prefix: RedisKey::from(prefix.to_string()),
connection_manager: conn,
dead_instance_threshold_ms: THRESHOLD_MS,
flush_interval: Duration::from_millis(FLUSH_MS),
allowed_lag: Duration::from_millis(FLUSH_MS),
})
}
async fn make_lax(name: &str) -> Arc<LaxInstanceAwareCounter> {
make_lax_from_prefix(&unique_prefix(name)).await
}
async fn make_lax_pair(
name: &str,
) -> (Arc<LaxInstanceAwareCounter>, Arc<LaxInstanceAwareCounter>, String) {
let prefix = unique_prefix(name);
let c1 = make_lax_from_prefix(&prefix).await;
let c2 = make_lax_from_prefix(&prefix).await;
(c1, c2, prefix)
}
fn key(name: &str) -> RedisKey {
RedisKey::from(name.to_string())
}
#[tokio::test]
async fn inc_returns_local_estimate() {
let c = make_lax("inc_returns_local_estimate").await;
let k = key("hits");
let (cum1, _) = c.inc(&k, 5).await.unwrap();
let (cum2, _) = c.inc(&k, 3).await.unwrap();
assert_eq!(cum1, 5);
assert_eq!(cum2, 8);
}
#[tokio::test]
async fn get_returns_local_estimate_including_pending_delta() {
let c = make_lax("get_with_pending").await;
let k = key("hits");
c.inc(&k, 10).await.unwrap();
c.inc(&k, 5).await.unwrap();
let (cum, _) = c.get(&k).await.unwrap();
assert_eq!(cum, 15);
}
#[tokio::test]
async fn flush_sends_delta_to_strict() {
let (c1, _c2, prefix) = make_lax_pair("flush_sends_delta").await;
let k = key("hits");
c1.inc(&k, 20).await.unwrap();
sleep(Duration::from_millis(FLUSH_MS * 5)).await;
let reader = make_lax_from_prefix(&prefix).await;
let (cum, _) = reader.get(&k).await.unwrap();
assert_eq!(cum, 20);
}
#[tokio::test]
async fn two_instances_cumulate_after_flush() {
let (c1, c2, prefix) = make_lax_pair("two_instances_cumulate").await;
let k = key("hits");
c1.inc(&k, 10).await.unwrap();
c2.inc(&k, 7).await.unwrap();
sleep(Duration::from_millis(FLUSH_MS * 5)).await;
let reader = make_lax_from_prefix(&prefix).await;
let (cum, _) = reader.get(&k).await.unwrap();
assert_eq!(cum, 17);
}
#[tokio::test]
async fn set_flushes_pending_delta_then_sets() {
let (c1, c2, _prefix) = make_lax_pair("set_flushes_delta").await;
let k = key("hits");
c1.inc(&k, 5).await.unwrap();
c1.set(&k, 100).await.unwrap();
let (cum, _) = c2.get(&k).await.unwrap();
assert_eq!(cum, 100);
}
#[tokio::test]
async fn del_flushes_pending_delta_then_deletes() {
let (c1, c2, _prefix) = make_lax_pair("del_flushes_delta").await;
let k = key("hits");
c1.inc(&k, 15).await.unwrap();
let (old_cum, _) = c1.del(&k).await.unwrap();
assert_eq!(old_cum, 15);
let (cum, _) = c2.get(&k).await.unwrap();
assert_eq!(cum, 0);
}
#[tokio::test]
async fn dead_instance_cleaned_through_lax_wrapper() {
let (c1, _c2, prefix) = make_lax_pair("dead_instance_lax").await;
let k = key("hits");
c1.inc(&k, 30).await.unwrap();
sleep(Duration::from_millis(FLUSH_MS * 5)).await;
sleep(Duration::from_millis(THRESHOLD_MS + 50)).await;
let reader = make_lax_from_prefix(&prefix).await;
let (cum, _) = reader.get(&k).await.unwrap();
assert_eq!(cum, 0, "dead instance contribution should be cleaned up");
}
#[tokio::test]
async fn clear_flushes_and_wipes() {
let (c1, _c2, prefix) = make_lax_pair("clear_flushes_and_wipes").await;
let k = key("hits");
c1.inc(&k, 50).await.unwrap();
c1.clear().await.unwrap();
let reader = make_lax_from_prefix(&prefix).await;
let (cum, _) = reader.get(&k).await.unwrap();
assert_eq!(cum, 0);
}
#[tokio::test]
async fn clear_on_instance_removes_only_this_instance() {
let (c1, c2, prefix) = make_lax_pair("clear_on_instance_lax").await;
let k = key("hits");
c1.inc(&k, 20).await.unwrap();
c2.inc(&k, 10).await.unwrap();
sleep(Duration::from_millis(FLUSH_MS * 5)).await;
c1.clear_on_instance().await.unwrap();
let reader = make_lax_from_prefix(&prefix).await;
let (cum, _) = reader.get(&k).await.unwrap();
assert_eq!(cum, 10, "only c1's contribution should be removed");
}