use std::ops::Range;
use std::sync::Arc;
use async_channel::{Receiver, RecvError, Sender, unbounded};
use humansize::make_format;
use tracing::trace;
use crate::mem_limiter::{BufferArea, MemoryLimiter};
use super::PrefetchReadError;
#[derive(Debug)]
pub enum BackpressureFeedbackEvent {
DataRead { offset: u64, length: usize },
PartQueueStall,
}
#[derive(Copy, Clone, Debug)]
pub enum ReadWindowAlignmentConfig {
AlignToPartSize { from_offset: u64, part_size: u64 },
Disable,
}
impl ReadWindowAlignmentConfig {
fn align(&self, read_window_end: u64) -> u64 {
match self {
ReadWindowAlignmentConfig::AlignToPartSize { from_offset, part_size } => {
if read_window_end > *from_offset {
let relative_end_offset = read_window_end - from_offset;
from_offset + relative_end_offset.next_multiple_of(*part_size)
} else {
read_window_end
}
}
ReadWindowAlignmentConfig::Disable => read_window_end,
}
}
}
pub struct BackpressureConfig {
pub initial_read_window_size: usize,
pub min_read_window_size: usize,
pub max_read_window_size: usize,
pub read_window_size_multiplier: usize,
pub request_range: Range<u64>,
pub read_window_alignment_config: ReadWindowAlignmentConfig,
}
#[derive(Debug)]
pub struct BackpressureController {
read_window_updater: Sender<usize>,
preferred_read_window_size: usize,
min_read_window_size: usize,
max_read_window_size: usize,
read_window_size_multiplier: usize,
read_window_end_offset: u64,
next_read_offset: u64,
request_end_offset: u64,
mem_limiter: Arc<MemoryLimiter>,
read_window_alignment_config: ReadWindowAlignmentConfig,
}
#[derive(Debug)]
pub struct BackpressureLimiter {
read_window_increment_queue: ReadWindowIncrementQueue,
read_window_end_offset: u64,
request_end_offset: u64,
}
pub fn new_backpressure_controller(
config: BackpressureConfig,
mem_limiter: Arc<MemoryLimiter>,
) -> (BackpressureController, BackpressureLimiter) {
const MIN_WINDOW_SIZE_MULTIPLIER: usize = 2;
let read_window_end_offset = config.request_range.start + config.initial_read_window_size as u64;
mem_limiter.reserve(BufferArea::Prefetch, config.initial_read_window_size as u64);
let (read_window_updater, read_window_increment_queue) = unbounded();
let read_window_increment_queue = ReadWindowIncrementQueue::new(read_window_increment_queue);
let controller = BackpressureController {
read_window_updater,
preferred_read_window_size: config.initial_read_window_size,
min_read_window_size: config.min_read_window_size,
max_read_window_size: config.max_read_window_size,
read_window_size_multiplier: config.read_window_size_multiplier.max(MIN_WINDOW_SIZE_MULTIPLIER),
read_window_end_offset,
next_read_offset: config.request_range.start,
request_end_offset: config.request_range.end,
mem_limiter,
read_window_alignment_config: config.read_window_alignment_config,
};
trace!(?controller, "initialising backpressure controller");
let limiter = BackpressureLimiter {
read_window_increment_queue,
read_window_end_offset,
request_end_offset: config.request_range.end,
};
(controller, limiter)
}
impl BackpressureController {
pub fn read_window_end_offset(&self) -> u64 {
self.read_window_end_offset
}
pub async fn send_feedback<E>(&mut self, event: BackpressureFeedbackEvent) -> Result<(), PrefetchReadError<E>> {
match event {
BackpressureFeedbackEvent::DataRead { offset, length } => {
self.next_read_offset = offset + length as u64;
self.mem_limiter.release(BufferArea::Prefetch, length as u64);
let remaining_window = self.read_window_end_offset.saturating_sub(self.next_read_offset) as usize;
while remaining_window < (self.preferred_read_window_size / 2)
&& self.read_window_end_offset < self.request_end_offset
{
let new_read_window_end_offset_preferred = self
.next_read_offset
.saturating_add(self.preferred_read_window_size as u64);
let new_read_window_end_offset_aligned = self
.read_window_alignment_config
.align(new_read_window_end_offset_preferred);
let new_read_window_end_offset = new_read_window_end_offset_aligned.min(self.request_end_offset);
if new_read_window_end_offset <= self.read_window_end_offset {
break;
}
let to_increase = new_read_window_end_offset.saturating_sub(self.read_window_end_offset) as usize;
if self.preferred_read_window_size <= self.min_read_window_size {
trace!(new_read_window_end_offset, "sending a read window increment");
self.mem_limiter.reserve(BufferArea::Prefetch, to_increase as u64);
self.increment_read_window(to_increase).await;
break;
}
if self.mem_limiter.try_reserve(BufferArea::Prefetch, to_increase as u64) {
trace!(new_read_window_end_offset, "sending a read window increment");
self.increment_read_window(to_increase).await;
break;
} else {
self.scale_down();
}
}
}
BackpressureFeedbackEvent::PartQueueStall => self.scale_up(),
}
Ok(())
}
async fn increment_read_window(&mut self, len: usize) {
let prev_window_end_offset = self.read_window_end_offset;
let next_window_end_offset = prev_window_end_offset + len as u64;
trace!(
next_read_offset = self.next_read_offset,
prev_window_end_offset, next_window_end_offset, len, "incrementing read window",
);
let _ = self
.read_window_updater
.send(len)
.await
.inspect_err(|_| trace!("read window incrementing queue is already closed"));
self.read_window_end_offset = next_window_end_offset;
}
fn scale_up(&mut self) {
if self.preferred_read_window_size < self.max_read_window_size {
let new_read_window_size = (self.preferred_read_window_size * self.read_window_size_multiplier)
.max(self.min_read_window_size)
.min(self.max_read_window_size);
let to_increase = (new_read_window_size - self.preferred_read_window_size) as u64;
let available_mem = self.mem_limiter.available_mem();
if available_mem >= to_increase {
let formatter = make_format(humansize::BINARY);
trace!(
prev_size = formatter(self.preferred_read_window_size),
new_size = formatter(new_read_window_size),
"scaled up preferred read window"
);
self.preferred_read_window_size = new_read_window_size;
metrics::histogram!("prefetch.window_after_increase_mib")
.record((self.preferred_read_window_size / 1024 / 1024) as f64);
}
}
}
fn scale_down(&mut self) {
if self.preferred_read_window_size > self.min_read_window_size {
assert!(self.read_window_size_multiplier > 1);
let new_read_window_size = (self.preferred_read_window_size / self.read_window_size_multiplier)
.max(self.min_read_window_size)
.min(self.max_read_window_size);
let formatter = make_format(humansize::BINARY);
trace!(
current_size = formatter(self.preferred_read_window_size),
new_size = formatter(new_read_window_size),
"scaled down read window"
);
self.preferred_read_window_size = new_read_window_size;
metrics::histogram!("prefetch.window_after_decrease_mib")
.record((self.preferred_read_window_size / 1024 / 1024) as f64);
}
}
}
impl Drop for BackpressureController {
fn drop(&mut self) {
debug_assert!(
self.next_read_offset <= self.request_end_offset,
"invariant: the next read offset should never be larger than the request end offset",
);
let remaining_window = self.read_window_end_offset.saturating_sub(self.next_read_offset);
self.mem_limiter.release(BufferArea::Prefetch, remaining_window);
}
}
impl BackpressureLimiter {
pub fn read_window_end_offset(&self) -> u64 {
self.read_window_end_offset
}
pub async fn wait_for_read_window_increment<E>(
&mut self,
offset: u64,
) -> Result<Option<u64>, PrefetchReadError<E>> {
if self.read_window_end_offset > offset {
if let Some(increment_amount) = self.read_window_increment_queue.try_recv_drain() {
self.read_window_end_offset += increment_amount as u64;
return Ok(Some(self.read_window_end_offset));
} else {
return Ok(None);
}
}
while self.read_window_end_offset <= offset && self.read_window_end_offset < self.request_end_offset {
trace!(
desired_offset = offset,
current_offset = self.read_window_end_offset,
"blocking for read window increment",
);
match self.read_window_increment_queue.recv_drain().await {
Ok(len) => self.read_window_end_offset += len as u64,
Err(RecvError) => return Err(PrefetchReadError::ReadWindowIncrement),
}
}
Ok(Some(self.read_window_end_offset))
}
}
#[derive(Debug)]
struct ReadWindowIncrementQueue(Receiver<usize>);
impl ReadWindowIncrementQueue {
pub fn new(receiver: Receiver<usize>) -> Self {
Self(receiver)
}
pub async fn recv_drain(&self) -> Result<usize, RecvError> {
let mut increment_total = self.0.recv().await?;
while let Ok(len) = self.0.try_recv() {
increment_total += len;
}
Ok(increment_total)
}
pub fn try_recv_drain(&self) -> Option<usize> {
let mut increment_total = 0;
while let Ok(len) = self.0.try_recv() {
increment_total += len;
}
if increment_total > 0 {
Some(increment_total)
} else {
None
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
use futures::executor::block_on;
use mountpoint_s3_client::mock_client::MockClientError;
use test_case::test_case;
use crate::mem_limiter::MemoryLimiter;
use crate::memory::PagedPool;
use crate::prefetch::INITIAL_REQUEST_SIZE;
#[test_case(INITIAL_REQUEST_SIZE, 2)] #[test_case(3 * 1024 * 1024, 4)]
#[test_case(8 * 1024 * 1024, 8)]
#[test_case(2 * 1024 * 1024 * 1024, 2)]
fn test_read_window_scale_up(initial_read_window_size: usize, read_window_size_multiplier: usize) {
let request_range = 0..(5 * 1024 * 1024 * 1024);
let backpressure_config = BackpressureConfig {
initial_read_window_size,
min_read_window_size: 8 * 1024 * 1024,
max_read_window_size: 2 * 1024 * 1024 * 1024,
read_window_size_multiplier,
request_range,
read_window_alignment_config: ReadWindowAlignmentConfig::Disable,
};
let (mut backpressure_controller, _backpressure_limiter) =
new_backpressure_controller_for_test(backpressure_config);
while backpressure_controller.preferred_read_window_size < backpressure_controller.max_read_window_size {
backpressure_controller.scale_up();
assert!(backpressure_controller.preferred_read_window_size >= backpressure_controller.min_read_window_size);
assert!(backpressure_controller.preferred_read_window_size <= backpressure_controller.max_read_window_size);
}
assert_eq!(
backpressure_controller.preferred_read_window_size, backpressure_controller.max_read_window_size,
"should have scaled up to max read window size"
);
}
#[test_case(2 * 1024 * 1024 * 1024, 2)]
#[test_case(15 * 1024 * 1024 * 1024, 2)]
#[test_case(2 * 1024 * 1024 * 1024, 8)]
#[test_case(8 * 1024 * 1024, 8)]
fn test_read_window_scale_down(initial_read_window_size: usize, read_window_size_multiplier: usize) {
let request_range = 0..(5 * 1024 * 1024 * 1024);
let backpressure_config = BackpressureConfig {
initial_read_window_size,
min_read_window_size: 8 * 1024 * 1024,
max_read_window_size: 2 * 1024 * 1024 * 1024,
read_window_size_multiplier,
request_range,
read_window_alignment_config: ReadWindowAlignmentConfig::Disable,
};
let (mut backpressure_controller, _backpressure_limiter) =
new_backpressure_controller_for_test(backpressure_config);
while backpressure_controller.preferred_read_window_size > backpressure_controller.min_read_window_size {
backpressure_controller.scale_down();
assert!(backpressure_controller.preferred_read_window_size <= backpressure_controller.max_read_window_size);
assert!(backpressure_controller.preferred_read_window_size >= backpressure_controller.min_read_window_size);
}
assert_eq!(
backpressure_controller.preferred_read_window_size, backpressure_controller.min_read_window_size,
"should have scaled down to min read window size"
);
}
#[test]
fn wait_for_read_window_increment_drains_all_events() {
const KIB: usize = 1024;
const MIB: usize = 1024 * KIB;
const GIB: usize = 1024 * MIB;
#[allow(clippy::identity_op)]
let backpressure_config = BackpressureConfig {
initial_read_window_size: 1 * MIB,
min_read_window_size: 8 * MIB,
max_read_window_size: 2 * GIB,
read_window_size_multiplier: 2,
request_range: 0..(5 * GIB as u64),
read_window_alignment_config: ReadWindowAlignmentConfig::Disable,
};
let (mut backpressure_controller, mut backpressure_limiter) =
new_backpressure_controller_for_test(backpressure_config);
block_on(async {
#[allow(clippy::identity_op)]
let expected_offset = 1 * MIB as u64;
assert_eq!(
backpressure_limiter.read_window_end_offset(),
expected_offset,
"read window end offset should already be {expected_offset} due to initial read window size config",
);
backpressure_controller.increment_read_window(7 * MIB).await;
backpressure_controller.increment_read_window(8 * MIB).await;
backpressure_controller.increment_read_window(8 * MIB).await;
let curr_offset = backpressure_limiter
.wait_for_read_window_increment::<MockClientError>(0)
.await
.expect("should return OK as we have new values to increment before channels are closed")
.expect("value should change as we sent increments");
assert_eq!(
24 * MIB as u64,
curr_offset,
"expected offset did not match offset reported by limiter",
);
});
}
#[test_case(500, 1000, 100, 500; "offset before second request start")]
#[test_case(1000, 1000, 512, 1000; "offset at second request start")]
#[test_case(1500, 1000, 512, 1512; "offset after second request start, needs alignment")]
#[test_case(2024, 1000, 512, 2024; "offset after second request start, already aligned")]
#[test_case(1001, 1000, 512, 1512; "offset just after second request start, needs alignment")]
#[test_case(1512, 1000, 512, 1512; "offset exactly at part boundary")]
#[test_case(1513, 1000, 512, 2024; "offset just past part boundary")]
fn test_read_window_alignment(offset: u64, from_offset: u64, part_size: u64, expected: u64) {
let result = ReadWindowAlignmentConfig::AlignToPartSize { from_offset, part_size }.align(offset);
assert_eq!(result, expected);
}
fn new_backpressure_controller_for_test(
backpressure_config: BackpressureConfig,
) -> (BackpressureController, BackpressureLimiter) {
let pool = PagedPool::new_with_candidate_sizes([8 * 1024 * 1024]);
let mem_limiter = Arc::new(MemoryLimiter::new(
pool,
backpressure_config.max_read_window_size as u64,
));
new_backpressure_controller(backpressure_config, mem_limiter.clone())
}
}