use crate::backpressure::{
config::BackPressureConfig,
controller::{BackPressureController, BackPressureTimeout, BackPressureVersionTracker},
};
use time::OffsetDateTime;
fn diff_seconds(time_ns: i128, diff: i128) -> i128 {
time_ns - (diff * 1_000_000_000)
}
#[test]
fn test_backpressure_when_version_trackers_not_set() {
let config = BackPressureConfig::builder()
.max_timeout_ms(100)
.min_timeout_ms(10)
.rate_delta_threshold(100.0)
.suffix_fill_threshold(0.7)
.suffix_rate_threshold(0.5)
.suffix_max_size(300)
.build();
let bp_controller = BackPressureController::with_config(config);
let timeout_ms = bp_controller.calculate_timeout(20);
assert_eq!(timeout_ms, 0);
}
#[test]
fn test_backpressure_within_threshold_limit() {
let config = BackPressureConfig::builder()
.max_timeout_ms(100)
.min_timeout_ms(10)
.rate_delta_threshold(100.0)
.suffix_fill_threshold(0.7)
.suffix_rate_threshold(0.5)
.suffix_max_size(300)
.build();
let mut bp_controller = BackPressureController::with_config(config);
let test_start_time_ns = OffsetDateTime::now_utc().unix_timestamp_nanos();
bp_controller.update_suffix_head_trackers(BackPressureVersionTracker {
version: 1,
time_ns: diff_seconds(test_start_time_ns, 30),
});
bp_controller.update_candidate_received_tracker(BackPressureVersionTracker {
version: 1,
time_ns: diff_seconds(test_start_time_ns, 30),
});
bp_controller.update_suffix_head_trackers(BackPressureVersionTracker {
version: 10,
time_ns: diff_seconds(test_start_time_ns, 5),
});
bp_controller.update_candidate_received_tracker(BackPressureVersionTracker {
version: 10,
time_ns: diff_seconds(test_start_time_ns, 5),
});
let timeout_ms = bp_controller.calculate_timeout(20);
assert_eq!(timeout_ms, 0);
}
#[test]
fn test_backpressure_suffix_size_based() {
let config = BackPressureConfig::builder()
.max_timeout_ms(100)
.min_timeout_ms(10)
.rate_delta_threshold(100.0)
.suffix_fill_threshold(0.7)
.suffix_rate_threshold(0.5)
.suffix_max_size(300)
.build();
let bp_controller = BackPressureController::with_config(config);
let timeout_ms = bp_controller.calculate_timeout(20);
assert_eq!(timeout_ms, 0);
let timeout_ms = bp_controller.calculate_timeout(210);
assert_eq!(timeout_ms, 0);
let timeout_ms = bp_controller.calculate_timeout(213);
assert_eq!(timeout_ms, 11);
let timeout_ms = bp_controller.calculate_timeout(225);
assert_eq!(timeout_ms, 16);
let timeout_ms = bp_controller.calculate_timeout(240);
assert_eq!(timeout_ms, 27);
let timeout_ms = bp_controller.calculate_timeout(255);
assert_eq!(timeout_ms, 42);
let timeout_ms = bp_controller.calculate_timeout(270);
assert_eq!(timeout_ms, 59);
let timeout_ms = bp_controller.calculate_timeout(285);
assert_eq!(timeout_ms, 78);
let timeout_ms = bp_controller.calculate_timeout(294);
assert_eq!(timeout_ms, 91);
let timeout_ms = bp_controller.calculate_timeout(300);
assert_eq!(timeout_ms, 100);
let timeout_ms = bp_controller.calculate_timeout(300);
assert_eq!(timeout_ms, 100);
}
#[test]
fn test_backpressure_delta_rate_based() {
let config = BackPressureConfig::builder()
.max_timeout_ms(100)
.min_timeout_ms(10)
.suffix_fill_threshold(0.7)
.suffix_max_size(300)
.suffix_rate_threshold(0.5)
.rate_delta_threshold(50.0)
.build();
let mut bp_controller = BackPressureController::with_config(config);
let test_start_time_ns = OffsetDateTime::now_utc().unix_timestamp_nanos();
let current_suffix_size_50pc = 150;
let timeout_ms = bp_controller.calculate_timeout(current_suffix_size_50pc);
assert_eq!(timeout_ms, 0);
bp_controller.update_suffix_head_trackers(BackPressureVersionTracker {
version: 1,
time_ns: diff_seconds(test_start_time_ns, 300),
});
bp_controller.update_candidate_received_tracker(BackPressureVersionTracker {
version: 1,
time_ns: diff_seconds(test_start_time_ns, 300),
});
let timeout_ms = bp_controller.calculate_timeout(current_suffix_size_50pc);
assert_eq!(timeout_ms, 0);
bp_controller.update_suffix_head_trackers(BackPressureVersionTracker {
version: 11,
time_ns: diff_seconds(test_start_time_ns, 280),
});
bp_controller.update_candidate_received_tracker(BackPressureVersionTracker {
version: 1001,
time_ns: diff_seconds(test_start_time_ns, 280),
});
let timeout_ms = bp_controller.calculate_timeout(current_suffix_size_50pc);
assert_eq!(timeout_ms, 0);
bp_controller.update_suffix_head_trackers(BackPressureVersionTracker {
version: 21,
time_ns: diff_seconds(test_start_time_ns, 280),
});
bp_controller.update_candidate_received_tracker(BackPressureVersionTracker {
version: 1041,
time_ns: diff_seconds(test_start_time_ns, 280),
});
let timeout_ms = bp_controller.calculate_timeout(current_suffix_size_50pc);
assert_eq!(timeout_ms, 11);
bp_controller.update_suffix_head_trackers(BackPressureVersionTracker {
version: 21,
time_ns: diff_seconds(test_start_time_ns, 280),
});
bp_controller.update_candidate_received_tracker(BackPressureVersionTracker {
version: 1121,
time_ns: diff_seconds(test_start_time_ns, 280),
});
let timeout_ms = bp_controller.calculate_timeout(current_suffix_size_50pc);
assert_eq!(timeout_ms, 14);
bp_controller.update_suffix_head_trackers(BackPressureVersionTracker {
version: 21,
time_ns: diff_seconds(test_start_time_ns, 280),
});
bp_controller.update_candidate_received_tracker(BackPressureVersionTracker {
version: 1321,
time_ns: diff_seconds(test_start_time_ns, 280),
});
let timeout_ms = bp_controller.calculate_timeout(current_suffix_size_50pc);
assert_eq!(timeout_ms, 20);
bp_controller.update_suffix_head_trackers(BackPressureVersionTracker {
version: 21,
time_ns: diff_seconds(test_start_time_ns, 280),
});
bp_controller.update_candidate_received_tracker(BackPressureVersionTracker {
version: 1421,
time_ns: diff_seconds(test_start_time_ns, 280),
});
let timeout_ms = bp_controller.calculate_timeout(current_suffix_size_50pc);
assert_eq!(timeout_ms, 23);
bp_controller.update_suffix_head_trackers(BackPressureVersionTracker {
version: 21,
time_ns: diff_seconds(test_start_time_ns, 280),
});
bp_controller.update_candidate_received_tracker(BackPressureVersionTracker {
version: 1421,
time_ns: diff_seconds(test_start_time_ns, 280),
});
let current_suffix_size_71pc = 213;
let timeout_ms = bp_controller.calculate_timeout(current_suffix_size_71pc);
assert_eq!(timeout_ms, 24);
bp_controller.update_suffix_head_trackers(BackPressureVersionTracker {
version: 21,
time_ns: diff_seconds(test_start_time_ns, 280),
});
bp_controller.update_candidate_received_tracker(BackPressureVersionTracker {
version: 6021,
time_ns: diff_seconds(test_start_time_ns, 280),
});
let timeout_ms = bp_controller.calculate_timeout(current_suffix_size_50pc);
assert_eq!(timeout_ms, 80);
}
#[test]
fn test_backpressure_timeouts_no_timeout_scenarios() {
let config = BackPressureConfig::builder()
.max_timeout_ms(100)
.min_timeout_ms(10)
.suffix_fill_threshold(0.7)
.suffix_max_size(300)
.suffix_rate_threshold(0.5)
.rate_delta_threshold(50.0)
.build();
let mut bp_controller = BackPressureController::with_config(config);
let test_start_time_ns = OffsetDateTime::now_utc().unix_timestamp_nanos();
let timeout = bp_controller.compute_backpressure(20);
assert_eq!(timeout, BackPressureTimeout::NoTimeout);
bp_controller.update_suffix_head_trackers(BackPressureVersionTracker::new(10, diff_seconds(test_start_time_ns, 200)));
bp_controller.update_candidate_received_tracker(BackPressureVersionTracker::new(10, diff_seconds(test_start_time_ns, 200)));
bp_controller.update_suffix_head_trackers(BackPressureVersionTracker::new(20, diff_seconds(test_start_time_ns, 100)));
bp_controller.update_candidate_received_tracker(BackPressureVersionTracker::new(10_000, diff_seconds(test_start_time_ns, 100)));
let timeout = bp_controller.compute_backpressure(20);
assert_eq!(timeout, BackPressureTimeout::NoTimeout);
bp_controller.update_suffix_head_trackers(BackPressureVersionTracker::new(
1,
diff_seconds(test_start_time_ns, 2 * 1_000_000_000 ),
));
bp_controller.update_candidate_received_tracker(BackPressureVersionTracker::new(1, diff_seconds(test_start_time_ns, 2 * 1_000_000_000)));
bp_controller.update_suffix_head_trackers(BackPressureVersionTracker::new(
921,
diff_seconds(test_start_time_ns, 1_000_000_000 ),
));
bp_controller.update_candidate_received_tracker(BackPressureVersionTracker::new(
1_001,
diff_seconds(test_start_time_ns, 1_000_000_000 ),
));
let current_suffix_size = 180;
let timeout = bp_controller.compute_backpressure(current_suffix_size);
assert_eq!(timeout, BackPressureTimeout::NoTimeout);
}
#[test]
fn test_backpressure_timeouts_valid_timeout_scenarios() {
let config = BackPressureConfig::builder()
.max_timeout_ms(100)
.min_timeout_ms(10)
.suffix_fill_threshold(0.7)
.suffix_max_size(300)
.suffix_rate_threshold(0.5)
.rate_delta_threshold(50.0)
.max_head_stale_timeout_ms(2 * 1_000)
.build();
let mut bp_controller = BackPressureController::with_config(config);
let test_start_time_ns = OffsetDateTime::now_utc().unix_timestamp_nanos();
let current_suffix_size = 240;
let timeout = bp_controller.compute_backpressure(current_suffix_size);
assert!(matches!(timeout, BackPressureTimeout::Timeout(..)));
bp_controller.update_suffix_head_trackers(BackPressureVersionTracker::new(1, diff_seconds(test_start_time_ns, 300)));
bp_controller.update_candidate_received_tracker(BackPressureVersionTracker::new(1, diff_seconds(test_start_time_ns, 300)));
bp_controller.update_suffix_head_trackers(BackPressureVersionTracker::new(21, diff_seconds(test_start_time_ns, 280 )));
bp_controller.update_candidate_received_tracker(BackPressureVersionTracker::new(1421, diff_seconds(test_start_time_ns, 280 )));
let current_suffix_size = 180;
let timeout = bp_controller.compute_backpressure(current_suffix_size);
assert!(matches!(timeout, BackPressureTimeout::Timeout(..)));
bp_controller.update_suffix_head_trackers(BackPressureVersionTracker::new(1, diff_seconds(test_start_time_ns, 300)));
bp_controller.update_candidate_received_tracker(BackPressureVersionTracker::new(1, diff_seconds(test_start_time_ns, 300)));
bp_controller.update_suffix_head_trackers(BackPressureVersionTracker::new(21, diff_seconds(test_start_time_ns, 280 )));
bp_controller.update_candidate_received_tracker(BackPressureVersionTracker::new(1421, diff_seconds(test_start_time_ns, 280 )));
let current_suffix_size = 290;
let timeout = bp_controller.compute_backpressure(current_suffix_size);
assert!(matches!(timeout, BackPressureTimeout::Timeout(..)));
}
#[test]
fn test_backpressure_timeouts_critical_timeout_scenarios() {
let config = BackPressureConfig::builder()
.max_timeout_ms(100)
.min_timeout_ms(10)
.suffix_fill_threshold(0.7)
.suffix_max_size(300)
.suffix_rate_threshold(0.5)
.rate_delta_threshold(50.0)
.max_head_stale_timeout_ms(1)
.build();
let mut bp_controller = BackPressureController::with_config(config);
let test_start_time_ns = OffsetDateTime::now_utc().unix_timestamp_nanos();
bp_controller.update_suffix_head_trackers(BackPressureVersionTracker::new(1, diff_seconds(test_start_time_ns, 300)));
bp_controller.update_candidate_received_tracker(BackPressureVersionTracker::new(1, diff_seconds(test_start_time_ns, 300)));
bp_controller.update_suffix_head_trackers(BackPressureVersionTracker::new(21, diff_seconds(test_start_time_ns, 280)));
bp_controller.update_candidate_received_tracker(BackPressureVersionTracker::new(1421, diff_seconds(test_start_time_ns, 280)));
let current_suffix_size = 300;
let timeout = bp_controller.compute_backpressure(current_suffix_size);
assert!(matches!(timeout, BackPressureTimeout::CriticalStop(..)));
}