use std::time::Duration;
use std::time::Instant;
pub(crate) const ENTER_QUEUE_DEPTH_LINES: usize = 8;
pub(crate) const ENTER_OLDEST_AGE: Duration = Duration::from_millis(120);
pub(crate) const EXIT_QUEUE_DEPTH_LINES: usize = 2;
pub(crate) const EXIT_OLDEST_AGE: Duration = Duration::from_millis(40);
pub(crate) const EXIT_HOLD: Duration = Duration::from_millis(250);
pub(crate) const REENTER_CATCH_UP_HOLD: Duration = Duration::from_millis(250);
pub(crate) const SEVERE_QUEUE_DEPTH_LINES: usize = 64;
pub(crate) const SEVERE_OLDEST_AGE: Duration = Duration::from_millis(300);
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
pub enum ChunkingMode {
#[default]
Smooth,
CatchUp,
}
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
pub struct QueueSnapshot {
pub queued_lines: usize,
pub oldest_age: Option<Duration>,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum DrainPlan {
Single,
Batch(usize),
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct ChunkingDecision {
pub mode: ChunkingMode,
pub entered_catch_up: bool,
pub drain_plan: DrainPlan,
}
#[derive(Debug, Default, Clone)]
pub struct AdaptiveChunkingPolicy {
mode: ChunkingMode,
below_exit_threshold_since: Option<Instant>,
last_catch_up_exit_at: Option<Instant>,
low_motion: bool,
}
impl AdaptiveChunkingPolicy {
pub fn new() -> Self {
Self::default()
}
pub fn mode(&self) -> ChunkingMode {
self.mode
}
pub fn reset(&mut self) {
self.mode = ChunkingMode::Smooth;
self.below_exit_threshold_since = None;
self.last_catch_up_exit_at = None;
}
pub fn set_low_motion(&mut self, low_motion: bool) {
self.low_motion = low_motion;
if low_motion {
self.mode = ChunkingMode::Smooth;
self.below_exit_threshold_since = None;
self.last_catch_up_exit_at = None;
}
}
pub fn decide(&mut self, snapshot: QueueSnapshot, now: Instant) -> ChunkingDecision {
if self.low_motion {
self.mode = ChunkingMode::Smooth;
self.below_exit_threshold_since = None;
return ChunkingDecision {
mode: self.mode,
entered_catch_up: false,
drain_plan: DrainPlan::Single,
};
}
if snapshot.queued_lines == 0 {
self.note_catch_up_exit(now);
self.mode = ChunkingMode::Smooth;
self.below_exit_threshold_since = None;
return ChunkingDecision {
mode: self.mode,
entered_catch_up: false,
drain_plan: DrainPlan::Single,
};
}
let entered_catch_up = match self.mode {
ChunkingMode::Smooth => self.maybe_enter_catch_up(snapshot, now),
ChunkingMode::CatchUp => {
self.maybe_exit_catch_up(snapshot, now);
false
}
};
let drain_plan = match self.mode {
ChunkingMode::Smooth => DrainPlan::Single,
ChunkingMode::CatchUp => DrainPlan::Batch(snapshot.queued_lines.max(1)),
};
ChunkingDecision {
mode: self.mode,
entered_catch_up,
drain_plan,
}
}
fn maybe_enter_catch_up(&mut self, snapshot: QueueSnapshot, now: Instant) -> bool {
if !should_enter_catch_up(snapshot) {
return false;
}
if self.reentry_hold_active(now) && !is_severe_backlog(snapshot) {
return false;
}
self.mode = ChunkingMode::CatchUp;
self.below_exit_threshold_since = None;
self.last_catch_up_exit_at = None;
true
}
fn maybe_exit_catch_up(&mut self, snapshot: QueueSnapshot, now: Instant) {
if !should_exit_catch_up(snapshot) {
self.below_exit_threshold_since = None;
return;
}
match self.below_exit_threshold_since {
Some(since) if now.saturating_duration_since(since) >= EXIT_HOLD => {
self.mode = ChunkingMode::Smooth;
self.below_exit_threshold_since = None;
self.last_catch_up_exit_at = Some(now);
}
Some(_) => {}
None => {
self.below_exit_threshold_since = Some(now);
}
}
}
fn note_catch_up_exit(&mut self, now: Instant) {
if self.mode == ChunkingMode::CatchUp {
self.last_catch_up_exit_at = Some(now);
}
}
fn reentry_hold_active(&self, now: Instant) -> bool {
self.last_catch_up_exit_at
.is_some_and(|exit| now.saturating_duration_since(exit) < REENTER_CATCH_UP_HOLD)
}
}
fn should_enter_catch_up(snapshot: QueueSnapshot) -> bool {
snapshot.queued_lines >= ENTER_QUEUE_DEPTH_LINES
|| snapshot
.oldest_age
.is_some_and(|oldest| oldest >= ENTER_OLDEST_AGE)
}
fn should_exit_catch_up(snapshot: QueueSnapshot) -> bool {
snapshot.queued_lines <= EXIT_QUEUE_DEPTH_LINES
&& snapshot
.oldest_age
.is_some_and(|oldest| oldest <= EXIT_OLDEST_AGE)
}
fn is_severe_backlog(snapshot: QueueSnapshot) -> bool {
snapshot.queued_lines >= SEVERE_QUEUE_DEPTH_LINES
|| snapshot
.oldest_age
.is_some_and(|oldest| oldest >= SEVERE_OLDEST_AGE)
}
#[cfg(test)]
mod tests {
use super::*;
fn snap(queued_lines: usize, oldest_age_ms: u64) -> QueueSnapshot {
QueueSnapshot {
queued_lines,
oldest_age: Some(Duration::from_millis(oldest_age_ms)),
}
}
fn empty_snap() -> QueueSnapshot {
QueueSnapshot {
queued_lines: 0,
oldest_age: None,
}
}
#[test]
fn smooth_only_burst_emits_one_per_tick() {
let mut policy = AdaptiveChunkingPolicy::new();
let t0 = Instant::now();
for i in 0..5 {
let decision = policy.decide(snap(1, 10), t0 + Duration::from_millis(50 * i));
assert_eq!(decision.mode, ChunkingMode::Smooth);
assert!(!decision.entered_catch_up);
assert_eq!(decision.drain_plan, DrainPlan::Single);
}
}
#[test]
fn eight_line_burst_flips_to_catch_up_and_drains_full_backlog() {
let mut policy = AdaptiveChunkingPolicy::new();
let now = Instant::now();
let decision = policy.decide(snap(8, 10), now);
assert_eq!(decision.mode, ChunkingMode::CatchUp);
assert!(decision.entered_catch_up);
assert_eq!(decision.drain_plan, DrainPlan::Batch(8));
let decision = policy.decide(snap(20, 30), now + Duration::from_millis(10));
assert_eq!(decision.mode, ChunkingMode::CatchUp);
assert!(!decision.entered_catch_up, "no second transition signal");
assert_eq!(decision.drain_plan, DrainPlan::Batch(20));
}
#[test]
fn age_threshold_alone_triggers_catch_up() {
let mut policy = AdaptiveChunkingPolicy::new();
let now = Instant::now();
let decision = policy.decide(snap(2, 120), now);
assert_eq!(decision.mode, ChunkingMode::CatchUp);
assert!(decision.entered_catch_up);
assert_eq!(decision.drain_plan, DrainPlan::Batch(2));
}
#[test]
fn catch_up_exits_after_low_activity_hold() {
let mut policy = AdaptiveChunkingPolicy::new();
let t0 = Instant::now();
let _ = policy.decide(snap(10, 20), t0);
assert_eq!(policy.mode(), ChunkingMode::CatchUp);
let pre_hold = policy.decide(snap(2, 40), t0 + Duration::from_millis(50));
assert_eq!(pre_hold.mode, ChunkingMode::CatchUp);
let mid_hold = policy.decide(snap(2, 40), t0 + Duration::from_millis(200));
assert_eq!(mid_hold.mode, ChunkingMode::CatchUp);
let post_hold = policy.decide(snap(2, 40), t0 + Duration::from_millis(320));
assert_eq!(post_hold.mode, ChunkingMode::Smooth);
assert_eq!(post_hold.drain_plan, DrainPlan::Single);
}
#[test]
fn idle_resets_to_smooth_immediately() {
let mut policy = AdaptiveChunkingPolicy::new();
let now = Instant::now();
let _ = policy.decide(snap(10, 20), now);
assert_eq!(policy.mode(), ChunkingMode::CatchUp);
let decision = policy.decide(empty_snap(), now + Duration::from_millis(10));
assert_eq!(decision.mode, ChunkingMode::Smooth);
assert_eq!(decision.drain_plan, DrainPlan::Single);
}
#[test]
fn reentry_hold_blocks_immediate_flip_back() {
let mut policy = AdaptiveChunkingPolicy::new();
let t0 = Instant::now();
let _ = policy.decide(snap(10, 20), t0);
let _ = policy.decide(empty_snap(), t0 + Duration::from_millis(10));
let held = policy.decide(snap(8, 20), t0 + Duration::from_millis(100));
assert_eq!(held.mode, ChunkingMode::Smooth);
assert_eq!(held.drain_plan, DrainPlan::Single);
let reentered = policy.decide(snap(8, 20), t0 + Duration::from_millis(400));
assert_eq!(reentered.mode, ChunkingMode::CatchUp);
assert_eq!(reentered.drain_plan, DrainPlan::Batch(8));
}
#[test]
fn severe_backlog_bypasses_reentry_hold() {
let mut policy = AdaptiveChunkingPolicy::new();
let t0 = Instant::now();
let _ = policy.decide(snap(10, 20), t0);
let _ = policy.decide(empty_snap(), t0 + Duration::from_millis(10));
let severe = policy.decide(snap(64, 20), t0 + Duration::from_millis(100));
assert_eq!(severe.mode, ChunkingMode::CatchUp);
assert_eq!(severe.drain_plan, DrainPlan::Batch(64));
}
#[test]
fn low_motion_always_smooth_regardless_of_pressure() {
let mut policy = AdaptiveChunkingPolicy::new();
policy.set_low_motion(true);
let t0 = Instant::now();
let d1 = policy.decide(snap(20, 10), t0);
assert_eq!(d1.mode, ChunkingMode::Smooth);
assert!(!d1.entered_catch_up);
assert_eq!(d1.drain_plan, DrainPlan::Single);
let d2 = policy.decide(snap(5, 500), t0 + Duration::from_millis(100));
assert_eq!(d2.mode, ChunkingMode::Smooth);
assert!(!d2.entered_catch_up);
assert_eq!(d2.drain_plan, DrainPlan::Single);
let d3 = policy.decide(snap(80, 500), t0 + Duration::from_millis(200));
assert_eq!(d3.mode, ChunkingMode::Smooth);
assert_eq!(d3.drain_plan, DrainPlan::Single);
}
#[test]
fn low_motion_reset_resumes_normal_operation() {
let mut policy = AdaptiveChunkingPolicy::new();
policy.set_low_motion(true);
let t0 = Instant::now();
let d1 = policy.decide(snap(20, 10), t0);
assert_eq!(d1.mode, ChunkingMode::Smooth);
policy.set_low_motion(false);
let d2 = policy.decide(snap(20, 10), t0 + Duration::from_millis(10));
assert_eq!(d2.mode, ChunkingMode::CatchUp);
assert!(d2.entered_catch_up);
assert_eq!(d2.drain_plan, DrainPlan::Batch(20));
}
}