use std::collections::BTreeSet;
pub(crate) struct PartWindow {
chunk: u64,
total: u64,
max_in_flight: usize,
next_dispatch: u64,
completed: BTreeSet<u64>,
watermark: u64,
in_flight: usize,
}
impl PartWindow {
pub(crate) fn new(start_offset: u64, chunk: u64, total: u64, max_in_flight: usize) -> Self {
let start = start_offset.min(total);
Self {
chunk: chunk.max(1),
total,
max_in_flight: max_in_flight.max(1),
next_dispatch: start,
completed: BTreeSet::new(),
watermark: start,
in_flight: 0,
}
}
pub(crate) fn take_dispatch(&mut self) -> Option<u64> {
if self.in_flight >= self.max_in_flight || self.next_dispatch >= self.total {
return None;
}
let off = self.next_dispatch;
self.next_dispatch = (off + self.chunk).min(self.total);
self.in_flight += 1;
Some(off)
}
pub(crate) fn on_done(&mut self, offset: u64) -> Option<u64> {
debug_assert!(self.in_flight > 0, "on_done without a matching dispatch");
self.in_flight = self.in_flight.saturating_sub(1);
self.completed.insert(offset);
let before = self.watermark;
while self.watermark < self.total && self.completed.remove(&self.watermark) {
self.watermark = (self.watermark + self.chunk).min(self.total);
}
if self.watermark != before {
Some(self.watermark)
} else {
None
}
}
pub(crate) fn on_settled_without_progress(&mut self) {
self.in_flight = self.in_flight.saturating_sub(1);
}
pub(crate) fn watermark(&self) -> u64 {
self.watermark
}
pub(crate) fn is_complete(&self) -> bool {
self.watermark >= self.total && self.in_flight == 0
}
}
#[cfg(test)]
mod tests {
use super::PartWindow;
#[test]
fn dispatch_hands_out_aligned_offsets_up_to_total() {
let mut w = PartWindow::new(0, 10, 25, 8);
assert_eq!(w.take_dispatch(), Some(0));
assert_eq!(w.take_dispatch(), Some(10));
assert_eq!(w.take_dispatch(), Some(20));
assert_eq!(w.take_dispatch(), None);
}
#[test]
fn dispatch_is_bounded_by_max_in_flight() {
let mut w = PartWindow::new(0, 10, 1000, 2);
assert_eq!(w.take_dispatch(), Some(0));
assert_eq!(w.take_dispatch(), Some(10));
assert_eq!(w.take_dispatch(), None);
assert_eq!(w.on_done(0), Some(10));
assert_eq!(w.take_dispatch(), Some(20));
}
#[test]
fn in_order_completion_advances_watermark_each_step() {
let mut w = PartWindow::new(0, 10, 30, 8);
w.take_dispatch();
w.take_dispatch();
w.take_dispatch();
assert_eq!(w.on_done(0), Some(10));
assert_eq!(w.on_done(10), Some(20));
assert_eq!(w.on_done(20), Some(30));
assert!(w.is_complete());
}
#[test]
fn out_of_order_high_part_first_does_not_advance_past_a_hole() {
let mut w = PartWindow::new(0, 10, 30, 8);
w.take_dispatch(); w.take_dispatch(); w.take_dispatch(); assert_eq!(w.on_done(20), None);
assert_eq!(w.watermark(), 0);
assert!(!w.is_complete());
assert_eq!(w.on_done(10), None);
assert_eq!(w.watermark(), 0);
assert_eq!(w.on_done(0), Some(30));
assert_eq!(w.watermark(), 30);
assert!(w.is_complete());
}
#[test]
fn short_last_part_clamps_watermark_to_total() {
let mut w = PartWindow::new(0, 10, 25, 8);
w.take_dispatch();
w.take_dispatch();
w.take_dispatch();
w.on_done(0);
w.on_done(10);
assert_eq!(w.on_done(20), Some(25));
assert_eq!(w.watermark(), 25);
assert!(w.is_complete());
}
#[test]
fn is_complete_requires_watermark_total_and_nothing_in_flight() {
let mut w = PartWindow::new(0, 10, 20, 8);
w.take_dispatch(); w.take_dispatch(); w.on_done(0);
assert!(!w.is_complete());
w.on_done(10);
assert!(w.is_complete());
}
#[test]
fn settled_without_progress_frees_a_slot_but_does_not_advance() {
let mut w = PartWindow::new(0, 10, 30, 1);
assert_eq!(w.take_dispatch(), Some(0));
assert_eq!(w.take_dispatch(), None); w.on_settled_without_progress();
assert_eq!(w.watermark(), 0);
assert_eq!(w.take_dispatch(), Some(10));
}
#[test]
fn resume_starts_dispatch_and_watermark_at_start_offset() {
let mut w = PartWindow::new(10, 10, 30, 8);
assert_eq!(w.watermark(), 10);
assert_eq!(w.take_dispatch(), Some(10));
assert_eq!(w.take_dispatch(), Some(20));
assert_eq!(w.take_dispatch(), None);
assert_eq!(w.on_done(10), Some(20));
assert_eq!(w.on_done(20), Some(30));
assert!(w.is_complete());
}
#[test]
fn already_complete_resume_is_complete_with_no_dispatch() {
let mut w = PartWindow::new(30, 10, 30, 8);
assert_eq!(w.take_dispatch(), None);
assert!(w.is_complete());
}
}