use std::fmt::Debug;
use metrics::{counter, histogram};
use mountpoint_s3_client::error::{GetObjectError, ObjectClientError};
use mountpoint_s3_client::{ObjectClient, error_metadata::ProvideErrorMetadata};
use thiserror::Error;
use tracing::trace;
use crate::checksums::{ChecksummedBytes, IntegrityError};
use crate::data_cache::DataCache;
use crate::fs::error_metadata::{ErrorMetadata, MOUNTPOINT_ERROR_CLIENT};
use crate::mem_limiter::{BufferArea, MemoryLimiter};
use crate::metrics::defs::{FUSE_CACHE_HIT, PREFETCH_RESET_STATE};
use crate::object::ObjectId;
use crate::sync::Arc;
mod backpressure_controller;
mod builder;
mod caching_stream;
mod part;
mod part_queue;
mod part_stream;
mod seek_window;
mod task;
pub use builder::PrefetcherBuilder;
use part::PartOperationError;
use part_stream::{PartStream, RequestRange, RequestTaskConfig};
use seek_window::SeekWindow;
use task::RequestTask;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct HandleId(u64);
impl HandleId {
pub fn new(id: u64) -> Self {
Self(id)
}
pub fn as_raw(&self) -> u64 {
self.0
}
}
pub const INITIAL_REQUEST_SIZE: usize = 1024 * 1024 + 128 * 1024;
#[derive(Debug, Error)]
pub enum PrefetchReadError<E> {
#[error("get object request failed")]
GetRequestFailed {
source: ObjectClientError<GetObjectError, E>,
metadata: Box<ErrorMetadata>,
},
#[error("get object request returned wrong offset")]
GetRequestReturnedWrongOffset { offset: u64, expected_offset: u64 },
#[error("get request terminated unexpectedly")]
GetRequestTerminatedUnexpectedly,
#[error("integrity check failed")]
Integrity(#[from] IntegrityError),
#[error("part read failed")]
PartReadFailed(#[from] PartOperationError),
#[error("backpressure must be enabled with non-zero initial read window")]
BackpressurePreconditionFailed,
#[error("read window increment failed")]
ReadWindowIncrement,
}
impl<E: ProvideErrorMetadata + std::error::Error + Send + Sync + 'static> PrefetchReadError<E> {
fn get_request_failed(err: ObjectClientError<GetObjectError, E>, bucket: &str, key: &str) -> Self {
let metadata = ErrorMetadata {
client_error_meta: err.meta(),
error_code: Some(MOUNTPOINT_ERROR_CLIENT.to_string()),
s3_bucket_name: Some(bucket.to_string()),
s3_object_key: Some(key.to_string()),
};
let metadata = Box::new(metadata);
Self::GetRequestFailed { source: err, metadata }
}
}
#[derive(Debug, Clone, Copy)]
pub struct PrefetcherConfig {
pub max_read_window_size: usize,
pub sequential_prefetch_multiplier: usize,
pub max_forward_seek_wait_distance: u64,
pub max_backward_seek_distance: u64,
pub initial_request_size: usize,
}
impl Default for PrefetcherConfig {
#[allow(clippy::identity_op)]
fn default() -> Self {
Self {
max_read_window_size: determine_max_read_size(),
sequential_prefetch_multiplier: 2,
max_forward_seek_wait_distance: 16 * 1024 * 1024,
max_backward_seek_distance: 1 * 1024 * 1024,
initial_request_size: INITIAL_REQUEST_SIZE,
}
}
}
fn determine_max_read_size() -> usize {
const ENV_VAR_KEY: &str = "UNSTABLE_MOUNTPOINT_MAX_PREFETCH_WINDOW_SIZE";
const DEFAULT_READ_WINDOW_SIZE: usize = 2 * 1024 * 1024 * 1024;
match std::env::var_os(ENV_VAR_KEY) {
Some(val) => match val.to_string_lossy().parse() {
Ok(val) => {
tracing::warn!(
"successfully overridden prefetch read window size \
with new value {val} bytes from unstable environment config",
);
val
}
Err(_) => {
tracing::warn!(
"{ENV_VAR_KEY} did not contain a valid positive integer \
for prefetch bytes, using {DEFAULT_READ_WINDOW_SIZE} bytes instead",
);
DEFAULT_READ_WINDOW_SIZE
}
},
None => DEFAULT_READ_WINDOW_SIZE,
}
}
#[derive(Debug)]
pub struct Prefetcher<Client> {
part_stream: PartStream<Client>,
config: PrefetcherConfig,
mem_limiter: Arc<MemoryLimiter>,
}
impl<Client> Prefetcher<Client>
where
Client: ObjectClient + Clone + Send + Sync + 'static,
{
pub fn default_builder(client: Client) -> PrefetcherBuilder<Client> {
PrefetcherBuilder::default_builder(client)
}
pub fn caching_builder<Cache>(cache: Cache, client: Client) -> PrefetcherBuilder<Client>
where
Cache: DataCache + Send + Sync + 'static,
{
PrefetcherBuilder::caching_builder(cache, client)
}
pub fn new(part_stream: PartStream<Client>, config: PrefetcherConfig, mem_limiter: Arc<MemoryLimiter>) -> Self {
Self {
part_stream,
config,
mem_limiter,
}
}
pub fn prefetch(
&self,
bucket: String,
object_id: ObjectId,
handle_id: HandleId,
size: u64,
) -> PrefetchGetObject<Client>
where
Client: ObjectClient + Clone + Send + Sync + 'static,
{
PrefetchGetObject::new(
self.part_stream.clone(),
self.config,
bucket,
object_id,
handle_id,
size,
self.mem_limiter.clone(),
)
}
}
#[derive(Debug)]
pub struct PrefetchGetObject<Client>
where
Client: ObjectClient + Clone + Send + Sync + 'static,
{
part_stream: PartStream<Client>,
config: PrefetcherConfig,
backpressure_task: Option<RequestTask<Client>>,
backward_seek_window: SeekWindow,
bucket: String,
object_id: ObjectId,
preferred_part_size: usize,
sequential_read_start_offset: u64,
next_sequential_read_offset: u64,
next_request_offset: u64,
size: u64,
mem_limiter: Arc<MemoryLimiter>,
handle_id: HandleId,
}
impl<Client> PrefetchGetObject<Client>
where
Client: ObjectClient + Clone + Send + Sync + 'static,
{
fn new(
part_stream: PartStream<Client>,
config: PrefetcherConfig,
bucket: String,
object_id: ObjectId,
handle_id: HandleId,
size: u64,
mem_limiter: Arc<MemoryLimiter>,
) -> Self {
let max_backward_seek_distance = config.max_backward_seek_distance as usize;
let seek_window_reservation =
Self::seek_window_reservation(part_stream.client().read_part_size(), max_backward_seek_distance);
mem_limiter.reserve(BufferArea::Prefetch, seek_window_reservation);
PrefetchGetObject {
part_stream,
config,
backpressure_task: None,
backward_seek_window: SeekWindow::new(max_backward_seek_distance),
preferred_part_size: 128 * 1024,
sequential_read_start_offset: 0,
next_sequential_read_offset: 0,
next_request_offset: 0,
bucket,
object_id,
size,
mem_limiter,
handle_id,
}
}
pub async fn read(
&mut self,
offset: u64,
length: usize,
) -> Result<ChecksummedBytes, PrefetchReadError<Client::ClientError>> {
trace!(
offset,
length,
next_seq_offset = self.next_sequential_read_offset,
"read"
);
match self.try_read(offset, length).await {
Ok((data, cache_hit)) => {
if !data.is_empty() && cache_hit {
metrics::counter!(FUSE_CACHE_HIT).increment(1);
}
Ok(data)
}
Err(err) => {
self.reset_prefetch_to_offset(offset);
Err(err)
}
}
}
async fn try_read(
&mut self,
offset: u64,
length: usize,
) -> Result<(ChecksummedBytes, bool), PrefetchReadError<Client::ClientError>> {
let max_preferred_part_size = 1024 * 1024;
self.preferred_part_size = self.preferred_part_size.max(length).min(max_preferred_part_size);
let remaining = self.size.saturating_sub(offset);
if remaining == 0 {
return Ok((ChecksummedBytes::default(), false));
}
let mut to_read = (length as u64).min(remaining);
if self.next_sequential_read_offset != offset {
if self.try_seek(offset).await? {
trace!("seek succeeded");
} else {
trace!(
expected = self.next_sequential_read_offset,
actual = offset,
"out-of-order read, resetting prefetch"
);
counter!(PREFETCH_RESET_STATE).increment(1);
self.record_contiguous_read_metric();
self.reset_prefetch_to_offset(offset);
}
}
assert_eq!(self.next_sequential_read_offset, offset);
if self.backpressure_task.is_none() {
self.backpressure_task = Some(self.spawn_read_backpressure_request()?);
}
let mut all_parts_from_cache = true;
let mut response = ChecksummedBytes::default();
while to_read > 0 {
let Some(current_task) = self.backpressure_task.as_mut() else {
trace!(offset, length, "read beyond object size");
break;
};
debug_assert!(current_task.remaining() > 0);
let part = current_task.read(to_read as usize).await?;
all_parts_from_cache &= part.is_from_cache();
self.backward_seek_window.push(part.clone());
let part_bytes = part.into_bytes(&self.object_id, self.next_sequential_read_offset)?;
self.next_sequential_read_offset += part_bytes.len() as u64;
if response.is_empty() && part_bytes.len() == to_read as usize {
return Ok((part_bytes, all_parts_from_cache));
}
let part_len = part_bytes.len() as u64;
response.extend(part_bytes)?;
to_read -= part_len;
}
Ok((response, all_parts_from_cache))
}
fn spawn_read_backpressure_request(
&mut self,
) -> Result<RequestTask<Client>, PrefetchReadError<Client::ClientError>> {
let start = self.next_sequential_read_offset;
let object_size = self.size as usize;
let read_part_size = self.part_stream.client().read_part_size();
let range = RequestRange::new(object_size, start, object_size);
match self.part_stream.client().initial_read_window_size() {
Some(value) => {
if value == 0 {
return Err(PrefetchReadError::BackpressurePreconditionFailed);
}
}
None => return Err(PrefetchReadError::BackpressurePreconditionFailed),
};
let config = RequestTaskConfig {
bucket: self.bucket.clone(),
object_id: self.object_id.clone(),
handle_id: self.handle_id,
range,
read_part_size,
preferred_part_size: self.preferred_part_size,
initial_request_size: self.config.initial_request_size,
max_read_window_size: self.config.max_read_window_size,
read_window_size_multiplier: self.config.sequential_prefetch_multiplier,
};
Ok(self.part_stream.spawn_get_object_request(config))
}
fn reset_prefetch_to_offset(&mut self, offset: u64) {
self.backpressure_task = None;
self.backward_seek_window.clear();
self.sequential_read_start_offset = offset;
self.next_sequential_read_offset = offset;
self.next_request_offset = offset;
}
async fn try_seek(&mut self, offset: u64) -> Result<bool, PrefetchReadError<Client::ClientError>> {
assert_ne!(offset, self.next_sequential_read_offset);
trace!(from = self.next_sequential_read_offset, to = offset, "trying to seek");
if offset > self.next_sequential_read_offset {
self.try_seek_forward(offset).await
} else {
self.try_seek_backward(offset).await
}
}
async fn try_seek_forward(&mut self, offset: u64) -> Result<bool, PrefetchReadError<Client::ClientError>> {
assert!(offset > self.next_sequential_read_offset);
let total_seek_distance = offset - self.next_sequential_read_offset;
histogram!("prefetch.seek_distance", "dir" => "forward").record(total_seek_distance as f64);
let Some(task) = self.backpressure_task.as_mut() else {
return Ok(false);
};
if offset >= task.read_window_end_offset() {
return Ok(false);
}
let available_offset = task.available_offset();
let available_soon_offset = available_offset.saturating_add(self.config.max_forward_seek_wait_distance);
if offset >= available_soon_offset {
trace!(
requested_offset = offset,
available_offset = available_offset,
"seek failed: not enough data available"
);
return Ok(false);
}
let mut seek_distance = offset - self.next_sequential_read_offset;
while seek_distance > 0 {
let part = task.read(seek_distance as usize).await?;
seek_distance -= part.len() as u64;
self.next_sequential_read_offset += part.len() as u64;
self.backward_seek_window.push(part);
}
Ok(true)
}
async fn try_seek_backward(&mut self, offset: u64) -> Result<bool, PrefetchReadError<Client::ClientError>> {
assert!(offset < self.next_sequential_read_offset);
let Some(task) = self.backpressure_task.as_mut() else {
return Ok(false);
};
let backwards_length_needed = self.next_sequential_read_offset - offset;
histogram!("prefetch.seek_distance", "dir" => "backward").record(backwards_length_needed as f64);
let Some(parts) = self.backward_seek_window.read_back(backwards_length_needed as usize) else {
trace!("seek failed: not enough data in backwards seek window");
return Ok(false);
};
task.push_front(parts).await?;
self.next_sequential_read_offset = offset;
Ok(true)
}
fn record_contiguous_read_metric(&self) {
histogram!("prefetch.contiguous_read_len")
.record((self.next_sequential_read_offset - self.sequential_read_start_offset) as f64);
}
fn seek_window_reservation(part_size: usize, seek_window_size: usize) -> u64 {
(seek_window_size.div_ceil(part_size) * part_size) as u64
}
}
impl<Client> Drop for PrefetchGetObject<Client>
where
Client: ObjectClient + Clone + Send + Sync + 'static,
{
fn drop(&mut self) {
let seek_window_reservation = Self::seek_window_reservation(
self.part_stream.client().read_part_size(),
self.backward_seek_window.max_size(),
);
self.mem_limiter.release(BufferArea::Prefetch, seek_window_reservation);
self.record_contiguous_read_metric();
}
}
#[cfg(test)]
mod tests {
#![allow(clippy::identity_op)]
use crate::Runtime;
use crate::data_cache::InMemoryDataCache;
use crate::mem_limiter::{MINIMUM_MEM_LIMIT, MemoryLimiter};
use crate::memory::PagedPool;
use crate::sync::Arc;
use super::*;
use futures::executor::{ThreadPool, block_on};
use mountpoint_s3_client::failure_client::{
CountdownFailureConfig, GetObjectFailureMode, countdown_failure_client,
};
use mountpoint_s3_client::mock_client::{MockClient, MockClientConfig, MockClientError, MockObject, ramp_bytes};
use mountpoint_s3_client::types::ETag;
use proptest::proptest;
use proptest::strategy::{Just, Strategy};
use proptest_derive::Arbitrary;
use std::collections::HashMap;
use test_case::test_case;
const KB: usize = 1024;
const MB: usize = 1024 * 1024;
#[derive(Debug, Arbitrary)]
struct TestConfig {
#[proptest(strategy = "16usize..1*1024*1024")]
initial_request_size: usize,
#[proptest(strategy = "16usize..1*1024*1024")]
max_read_window_size: usize,
#[proptest(strategy = "1usize..8usize")]
sequential_prefetch_multiplier: usize,
#[proptest(strategy = "16usize..2*1024*1024")]
client_part_size: usize,
#[proptest(strategy = "1u64..4*1024*1024")]
max_forward_seek_wait_distance: u64,
#[proptest(strategy = "1u64..4*1024*1024")]
max_backward_seek_distance: u64,
#[proptest(strategy = "16usize..1*1024*1024")]
cache_block_size: usize,
}
enum PrefetcherType {
Default,
InMemoryCache(usize),
}
fn build_prefetcher<Client>(
client: Client,
prefetcher_type: PrefetcherType,
prefetcher_config: PrefetcherConfig,
) -> Prefetcher<Client>
where
Client: ObjectClient + Clone + Send + Sync + 'static,
{
let pool = PagedPool::new_with_candidate_sizes([client.read_part_size(), client.write_part_size()]);
let mem_limiter = Arc::new(MemoryLimiter::new(pool, MINIMUM_MEM_LIMIT));
let runtime = Runtime::new(ThreadPool::builder().pool_size(1).create().unwrap());
let builder = match prefetcher_type {
PrefetcherType::Default => Prefetcher::default_builder(client),
PrefetcherType::InMemoryCache(block_size) => {
let cache = InMemoryDataCache::new(block_size as u64);
Prefetcher::caching_builder(cache, client)
}
};
builder.build(runtime, mem_limiter, prefetcher_config)
}
fn run_sequential_read_test(prefetcher_type: PrefetcherType, size: u64, read_size: usize, test_config: TestConfig) {
let client = Arc::new(
MockClient::config()
.bucket("test-bucket")
.part_size(test_config.client_part_size)
.enable_backpressure(true)
.initial_read_window_size(test_config.client_part_size)
.build(),
);
let object = MockObject::ramp(0xaa, size as usize, ETag::for_tests());
let etag = object.etag();
client.add_object("hello", object);
let prefetcher_config = PrefetcherConfig {
max_read_window_size: test_config.max_read_window_size,
sequential_prefetch_multiplier: test_config.sequential_prefetch_multiplier,
max_forward_seek_wait_distance: test_config.max_forward_seek_wait_distance,
max_backward_seek_distance: test_config.max_backward_seek_distance,
initial_request_size: test_config.initial_request_size,
};
let prefetcher = build_prefetcher(client.clone(), prefetcher_type, prefetcher_config);
let object_id = ObjectId::new("hello".to_owned(), etag);
let fh = HandleId::new(1);
let mut request = prefetcher.prefetch("test-bucket".to_owned(), object_id, fh, size);
let mut next_offset = 0;
loop {
let buf = block_on(request.read(next_offset, read_size)).unwrap();
if buf.is_empty() {
break;
}
let buf = buf.into_bytes().unwrap();
let expected = ramp_bytes((0xaa + next_offset) as usize, buf.len());
assert_eq!(&buf[..], &expected[..buf.len()]);
next_offset += buf.len() as u64;
}
assert_eq!(next_offset, size);
}
#[test_case(PrefetcherType::Default)]
#[test_case(PrefetcherType::InMemoryCache(1 * MB))]
fn sequential_read_small(prefetcher_type: PrefetcherType) {
let config = TestConfig {
initial_request_size: 256 * 1024,
max_read_window_size: 1024 * 1024 * 1024,
sequential_prefetch_multiplier: 8,
client_part_size: 8 * 1024 * 1024,
max_forward_seek_wait_distance: 16 * 1024 * 1024,
max_backward_seek_distance: 2 * 1024 * 1024,
cache_block_size: 1 * MB,
};
run_sequential_read_test(prefetcher_type, 1024 * 1024 + 111, 1024 * 1024, config);
}
#[test_case(PrefetcherType::Default)]
#[test_case(PrefetcherType::InMemoryCache(1 * MB))]
fn sequential_read_medium(prefetcher_type: PrefetcherType) {
let config = TestConfig {
initial_request_size: 256 * 1024,
max_read_window_size: 64 * 1024 * 1024,
sequential_prefetch_multiplier: 8,
client_part_size: 8 * 1024 * 1024,
max_forward_seek_wait_distance: 16 * 1024 * 1024,
max_backward_seek_distance: 2 * 1024 * 1024,
cache_block_size: 1 * MB,
};
run_sequential_read_test(prefetcher_type, 16 * 1024 * 1024 + 111, 1024 * 1024, config);
}
#[test_case(PrefetcherType::Default)]
#[test_case(PrefetcherType::InMemoryCache(1 * MB))]
fn sequential_read_large(prefetcher_type: PrefetcherType) {
let config = TestConfig {
initial_request_size: 256 * 1024,
max_read_window_size: 64 * 1024 * 1024,
sequential_prefetch_multiplier: 8,
client_part_size: 8 * 1024 * 1024,
max_forward_seek_wait_distance: 16 * 1024 * 1024,
max_backward_seek_distance: 2 * 1024 * 1024,
cache_block_size: 1 * MB,
};
run_sequential_read_test(prefetcher_type, 256 * 1024 * 1024 + 111, 1024 * 1024, config);
}
fn fail_with_backpressure_precondition_test(
prefetcher_type: PrefetcherType,
test_config: TestConfig,
client_config: MockClientConfig,
) {
let client = Arc::new(MockClient::new(client_config));
let read_size = 1 * MB;
let object_size = 8 * MB;
let object = MockObject::ramp(0xaa, object_size, ETag::for_tests());
let etag = object.etag();
let prefetcher_config = PrefetcherConfig {
max_read_window_size: test_config.max_read_window_size,
sequential_prefetch_multiplier: test_config.sequential_prefetch_multiplier,
..Default::default()
};
let prefetcher = build_prefetcher(client, prefetcher_type, prefetcher_config);
let object_id = ObjectId::new("hello".to_owned(), etag);
let fh = HandleId::new(1);
let mut request = prefetcher.prefetch("test-bucket".to_owned(), object_id, fh, object_size as u64);
let result = block_on(request.read(0, read_size));
assert!(matches!(result, Err(PrefetchReadError::BackpressurePreconditionFailed)));
}
#[test_case(PrefetcherType::Default)]
#[test_case(PrefetcherType::InMemoryCache(1 * MB))]
fn fail_with_backpressure_not_enabled(prefetcher_type: PrefetcherType) {
let test_config = TestConfig {
initial_request_size: 256 * 1024,
max_read_window_size: 1024 * 1024 * 1024,
sequential_prefetch_multiplier: 8,
client_part_size: 8 * 1024 * 1024,
max_forward_seek_wait_distance: 16 * 1024 * 1024,
max_backward_seek_distance: 2 * 1024 * 1024,
cache_block_size: 1 * MB,
};
let config = MockClient::config()
.bucket("test-bucket")
.part_size(test_config.client_part_size)
.enable_backpressure(false);
fail_with_backpressure_precondition_test(prefetcher_type, test_config, config);
}
#[test_case(PrefetcherType::Default)]
#[test_case(PrefetcherType::InMemoryCache(1 * MB))]
fn fail_with_backpressure_zero_read_window(prefetcher_type: PrefetcherType) {
let test_config = TestConfig {
initial_request_size: 256 * 1024,
max_read_window_size: 1024 * 1024 * 1024,
sequential_prefetch_multiplier: 8,
client_part_size: 8 * 1024 * 1024,
max_forward_seek_wait_distance: 16 * 1024 * 1024,
max_backward_seek_distance: 2 * 1024 * 1024,
cache_block_size: 1 * MB,
};
let config = MockClient::config()
.bucket("test-bucket")
.part_size(test_config.client_part_size)
.enable_backpressure(true)
.initial_read_window_size(0);
fail_with_backpressure_precondition_test(prefetcher_type, test_config, config);
}
fn fail_sequential_read_test(
prefetcher_type: PrefetcherType,
size: u64,
read_size: usize,
test_config: TestConfig,
get_failures: HashMap<usize, GetObjectFailureMode<MockClientError>>,
) {
let client = MockClient::config()
.bucket("test-bucket")
.part_size(test_config.client_part_size)
.enable_backpressure(true)
.initial_read_window_size(test_config.client_part_size)
.build();
let object = MockObject::ramp(0xaa, size as usize, ETag::for_tests());
let etag = object.etag();
client.add_object("hello", object);
let client = Arc::new(countdown_failure_client(
client,
CountdownFailureConfig {
get_failures,
..Default::default()
},
));
let prefetcher_config = PrefetcherConfig {
max_read_window_size: test_config.max_read_window_size,
sequential_prefetch_multiplier: test_config.sequential_prefetch_multiplier,
initial_request_size: test_config.initial_request_size,
..Default::default()
};
let prefetcher = build_prefetcher(client, prefetcher_type, prefetcher_config);
let object_id = ObjectId::new("hello".to_owned(), etag);
let fh = HandleId::new(1);
let mut request = prefetcher.prefetch("test-bucket".to_owned(), object_id, fh, size);
let mut next_offset = 0;
loop {
let buf = match block_on(request.read(next_offset, read_size)) {
Ok(buf) => buf,
Err(_) => break,
};
let buf = buf.into_bytes().unwrap();
if buf.is_empty() {
break;
}
let expected = ramp_bytes((0xaa + next_offset) as usize, buf.len());
assert_eq!(&buf[..], &expected[..buf.len()]);
next_offset += buf.len() as u64;
}
assert!(next_offset < size); }
#[test_case("invalid range; length=42", PrefetcherType::Default)]
#[test_case("invalid range; length=42", PrefetcherType::InMemoryCache(1 * MB))]
#[test_case(
"At least one of the pre-conditions you specified did not hold",
PrefetcherType::Default
)]
#[test_case("At least one of the pre-conditions you specified did not hold", PrefetcherType::InMemoryCache(1 * MB))]
fn fail_request_sequential_small(err_value: &str, prefetcher_type: PrefetcherType) {
let config = TestConfig {
initial_request_size: 256 * 1024,
max_read_window_size: 1024 * 1024 * 1024,
sequential_prefetch_multiplier: 8,
client_part_size: 8 * 1024 * 1024,
max_forward_seek_wait_distance: 16 * 1024 * 1024,
max_backward_seek_distance: 2 * 1024 * 1024,
cache_block_size: 1 * MB,
};
let mut get_failures = HashMap::new();
get_failures.insert(
2,
GetObjectFailureMode::OperationError(ObjectClientError::ClientError(MockClientError(
err_value.to_owned().into(),
))),
);
fail_sequential_read_test(prefetcher_type, 1024 * 1024 + 111, 1024 * 1024, config, get_failures);
}
proptest! {
#[test]
fn proptest_sequential_read(
size in 1u64..1 * 1024 * 1024,
read_size in 1usize..1 * 1024 * 1024,
config: TestConfig,
) {
run_sequential_read_test(PrefetcherType::Default, size, read_size, config);
}
#[test]
fn proptest_sequential_read_small_read_size(size in 1u64..1 * 1024 * 1024, read_factor in 1usize..10, config: TestConfig) {
let read_size = (size as usize / read_factor).max(1);
run_sequential_read_test(PrefetcherType::Default, size, read_size, config);
}
#[test]
fn proptest_sequential_read_with_cache(
size in 1u64..1 * 1024 * 1024,
read_size in 1usize..1 * 1024 * 1024,
config: TestConfig,
) {
run_sequential_read_test(PrefetcherType::InMemoryCache(config.cache_block_size), size, read_size, config);
}
#[test]
fn proptest_sequential_read_small_read_size_with_cache(size in 1u64..1 * 1024 * 1024, read_factor in 1usize..10,
config: TestConfig) {
let read_size = (size as usize / read_factor).max(1);
run_sequential_read_test(PrefetcherType::InMemoryCache(config.cache_block_size), size, read_size, config);
}
}
#[test]
fn test_sequential_read_regression() {
let object_size = 854966;
let read_size = 161647;
let config = TestConfig {
initial_request_size: 484941,
max_read_window_size: 81509,
sequential_prefetch_multiplier: 1,
client_part_size: 181682,
max_forward_seek_wait_distance: 1,
max_backward_seek_distance: 18668,
cache_block_size: 1 * MB,
};
run_sequential_read_test(PrefetcherType::Default, object_size, read_size, config);
}
fn run_random_read_test(
prefetcher_type: PrefetcherType,
object_size: u64,
reads: Vec<(u64, usize)>,
test_config: TestConfig,
) {
let client = Arc::new(
MockClient::config()
.bucket("test-bucket")
.part_size(test_config.client_part_size)
.enable_backpressure(true)
.initial_read_window_size(test_config.client_part_size)
.build(),
);
let object = MockObject::ramp(0xaa, object_size as usize, ETag::for_tests());
let etag = object.etag();
client.add_object("hello", object);
let prefetcher_config = PrefetcherConfig {
max_read_window_size: test_config.max_read_window_size,
sequential_prefetch_multiplier: test_config.sequential_prefetch_multiplier,
max_forward_seek_wait_distance: test_config.max_forward_seek_wait_distance,
max_backward_seek_distance: test_config.max_backward_seek_distance,
initial_request_size: test_config.initial_request_size,
};
let prefetcher = build_prefetcher(client, prefetcher_type, prefetcher_config);
let object_id = ObjectId::new("hello".to_owned(), etag);
let fh = HandleId::new(1);
let mut request = prefetcher.prefetch("test-bucket".to_owned(), object_id, fh, object_size);
for (offset, length) in reads {
assert!(offset < object_size);
assert!(offset + length as u64 <= object_size);
let expected = ramp_bytes((0xaa + offset) as usize, length);
let buf = block_on(request.read(offset, length)).unwrap();
let buf = buf.into_bytes().unwrap();
assert_eq!(buf.len(), expected.len());
if buf[..] != expected[..] {
for i in 0..buf.len() {
if buf[i] != expected[i] {
panic!(
"buffer mismatch at offset {}, saw {} expected {}",
i, buf[i], expected[i]
);
}
}
}
}
}
fn random_read_strategy(max_object_size: u64) -> impl Strategy<Value = (u64, Vec<(u64, usize)>)> {
(1..=max_object_size).prop_flat_map(|object_size| {
(
Just(object_size),
proptest::collection::vec(
(0..object_size).prop_flat_map(move |offset| {
(1..=object_size - offset).prop_map(move |length| (offset, length as usize))
}),
0..10,
),
)
})
}
proptest! {
#[test]
fn proptest_random_read(
reads in random_read_strategy(1 * 1024 * 1024),
config: TestConfig,
) {
let (object_size, reads) = reads;
run_random_read_test(PrefetcherType::Default, object_size, reads, config);
}
#[test]
fn proptest_random_read_with_cache(
reads in random_read_strategy(1 * 1024 * 1024),
config: TestConfig,
) {
let (object_size, reads) = reads;
run_random_read_test(PrefetcherType::InMemoryCache(config.cache_block_size), object_size, reads, config);
}
}
#[test]
fn test_random_read_regression() {
let object_size = 724314;
let reads = vec![(0, 516883)];
let config = TestConfig {
initial_request_size: 3684779,
max_read_window_size: 2147621,
sequential_prefetch_multiplier: 4,
client_part_size: 516882,
max_forward_seek_wait_distance: 16 * 1024 * 1024,
max_backward_seek_distance: 2 * 1024 * 1024,
cache_block_size: 1 * MB,
};
run_random_read_test(PrefetcherType::Default, object_size, reads, config);
}
#[test]
fn test_random_read_regression2() {
let object_size = 755678;
let reads = vec![(0, 278499), (311250, 1)];
let config = TestConfig {
initial_request_size: 556997,
max_read_window_size: 105938,
sequential_prefetch_multiplier: 7,
client_part_size: 1219731,
max_forward_seek_wait_distance: 16 * 1024 * 1024,
max_backward_seek_distance: 2 * 1024 * 1024,
cache_block_size: 1 * MB,
};
run_random_read_test(PrefetcherType::Default, object_size, reads, config);
}
#[test]
fn test_random_read_regression3() {
let object_size = 755678;
let reads = vec![(0, 236766), (291204, 1), (280930, 36002)];
let config = TestConfig {
initial_request_size: 556997,
max_read_window_size: 105938,
sequential_prefetch_multiplier: 7,
client_part_size: 1219731,
max_forward_seek_wait_distance: 2260662,
max_backward_seek_distance: 2369799,
cache_block_size: 1 * MB,
};
run_random_read_test(PrefetcherType::Default, object_size, reads, config);
}
#[test]
fn test_random_read_regression4() {
let object_size = 14201;
let reads = vec![(3584, 1), (9424, 1460), (3582, 3340), (248, 9218)];
let config = TestConfig {
initial_request_size: 457999,
max_read_window_size: 863511,
sequential_prefetch_multiplier: 5,
client_part_size: 1972409,
max_forward_seek_wait_distance: 2810651,
max_backward_seek_distance: 3531090,
cache_block_size: 1 * MB,
};
run_random_read_test(PrefetcherType::Default, object_size, reads, config);
}
#[test]
fn test_forward_seek_failure() {
const PART_SIZE: usize = 8192;
const OBJECT_SIZE: usize = 2 * PART_SIZE;
let client = MockClient::config()
.bucket("test-bucket")
.part_size(PART_SIZE)
.enable_backpressure(true)
.initial_read_window_size(OBJECT_SIZE)
.build();
let object = MockObject::ramp(0xaa, OBJECT_SIZE, ETag::for_tests());
let etag = object.etag();
client.add_object("hello", object);
let mut get_failures = HashMap::new();
get_failures.insert(
1,
GetObjectFailureMode::StreamPositionError(
2,
ObjectClientError::ClientError(MockClientError(
"error in the second chunk of the first request".into(),
)),
),
);
get_failures.insert(
2,
GetObjectFailureMode::OperationError(ObjectClientError::ClientError(MockClientError(
"error in second request".into(),
))),
);
let client = Arc::new(countdown_failure_client(
client,
CountdownFailureConfig {
get_failures,
..Default::default()
},
));
let prefetcher = build_prefetcher(client, PrefetcherType::Default, Default::default());
block_on(async {
let object_id = ObjectId::new("hello".to_owned(), etag.clone());
let fh = HandleId::new(1);
let mut request = prefetcher.prefetch("test-bucket".to_owned(), object_id, fh, OBJECT_SIZE as u64);
_ = request.read(0, 1).await.expect("first read should succeed");
let offset = PART_SIZE + 1;
_ = request.read(offset as u64, 1).await.expect_err("seek should fail");
_ = request
.read(offset as u64, 1)
.await
.expect_err("first retry after failure should fail");
let byte = request
.read(offset as u64, 1)
.await
.expect("second retry should succeed");
let expected = ramp_bytes(0xaa + offset, 1);
assert_eq!(byte.into_bytes().unwrap()[..], expected[..]);
});
}
#[test_case(PrefetcherType::Default)]
#[test_case(PrefetcherType::InMemoryCache(8192))]
fn test_short_read_failure(prefetcher_type: PrefetcherType) {
const PART_SIZE: usize = 8192;
const OBJECT_SIZE: usize = 2 * PART_SIZE;
let client = MockClient::config()
.bucket("test-bucket")
.part_size(PART_SIZE)
.enable_backpressure(true)
.initial_read_window_size(PART_SIZE)
.build();
let object = MockObject::ramp(0xaa, OBJECT_SIZE, ETag::for_tests());
let etag = object.etag();
client.add_object("hello", object);
let mut get_failures = HashMap::new();
get_failures.insert(1, GetObjectFailureMode::StreamShortCircuit(1));
get_failures.insert(3, GetObjectFailureMode::StreamShortCircuit(1));
let client = Arc::new(countdown_failure_client(
client,
CountdownFailureConfig {
get_failures,
..Default::default()
},
));
let prefetcher_config = PrefetcherConfig {
initial_request_size: PART_SIZE,
..Default::default()
};
let prefetcher = build_prefetcher(client, prefetcher_type, prefetcher_config);
block_on(async {
let object_id = ObjectId::new("hello".to_owned(), etag.clone());
let fh = HandleId::new(1);
let mut request = prefetcher.prefetch("test-bucket".to_owned(), object_id, fh, OBJECT_SIZE as u64);
assert!(matches!(
request.read(0, 10).await.expect_err("read should fail"),
PrefetchReadError::GetRequestTerminatedUnexpectedly,
));
let bytes = request.read(0, PART_SIZE).await.unwrap();
let expected = ramp_bytes(0xaa, PART_SIZE);
assert_eq!(bytes.into_bytes().unwrap()[..], expected[..]);
_ = request
.read(PART_SIZE as u64, PART_SIZE)
.await
.expect_err("read should fail");
let bytes = request.read(0, OBJECT_SIZE).await.unwrap();
let expected = ramp_bytes(0xaa, OBJECT_SIZE);
assert_eq!(bytes.into_bytes().unwrap()[..], expected[..]);
let bytes = request.read(PART_SIZE as u64, OBJECT_SIZE).await.unwrap();
let expected = ramp_bytes(0xaa + PART_SIZE, PART_SIZE);
assert_eq!(bytes.into_bytes().unwrap()[..], expected[..]);
});
}
#[test_case(0, 25; "no first read")]
#[test_case(60, 25; "read beyond first part")]
#[test_case(20, 25; "read in first part")]
#[test_case(125, 110; "read in second request")]
fn test_forward_seek(first_read_size: usize, part_size: usize) {
const OBJECT_SIZE: usize = 200;
const FIRST_REQUEST_SIZE: usize = 100;
let client = Arc::new(
MockClient::config()
.bucket("test-bucket")
.part_size(part_size)
.enable_backpressure(true)
.initial_read_window_size(FIRST_REQUEST_SIZE)
.build(),
);
let object = MockObject::ramp(0xaa, OBJECT_SIZE, ETag::for_tests());
let etag = object.etag();
client.add_object("hello", object);
let prefetcher = build_prefetcher(client, PrefetcherType::Default, Default::default());
for offset in first_read_size + 1..OBJECT_SIZE {
let object_id = ObjectId::new("hello".to_owned(), etag.clone());
let fh = HandleId::new(1);
let mut request = prefetcher.prefetch("test-bucket".to_owned(), object_id, fh, OBJECT_SIZE as u64);
if first_read_size > 0 {
let _first_read = block_on(request.read(0, first_read_size)).unwrap();
}
let byte = block_on(request.read(offset as u64, 1)).unwrap();
let expected = ramp_bytes(0xaa + offset, 1);
assert_eq!(byte.into_bytes().unwrap()[..], expected[..]);
}
}
#[test_case(60, 25; "read beyond first part")]
#[test_case(20, 25; "read in first part")]
#[test_case(125, 110; "read in second request")]
fn test_backward_seek(first_read_size: usize, part_size: usize) {
const OBJECT_SIZE: usize = 200;
let client = Arc::new(
MockClient::config()
.bucket("test-bucket")
.part_size(part_size)
.enable_backpressure(true)
.initial_read_window_size(part_size)
.build(),
);
let object = MockObject::ramp(0xaa, OBJECT_SIZE, ETag::for_tests());
let etag = object.etag();
client.add_object("hello", object);
let prefetcher = build_prefetcher(client, PrefetcherType::Default, Default::default());
for offset in 0..first_read_size {
let object_id = ObjectId::new("hello".to_owned(), etag.clone());
let fh = HandleId::new(1);
let mut request = prefetcher.prefetch("test-bucket".to_owned(), object_id, fh, OBJECT_SIZE as u64);
if first_read_size > 0 {
let _first_read = block_on(request.read(0, first_read_size)).unwrap();
}
let byte = block_on(request.read(offset as u64, 1)).unwrap();
let expected = ramp_bytes(0xaa + offset, 1);
assert_eq!(byte.into_bytes().unwrap()[..], expected[..]);
}
}
#[test_case(8 * 1024 * 1024, 1 * 1024 * 1024, 8 * 1024 * 1024; "8MiB part_size, 1MiB window")]
#[test_case(1 * 1024 * 1024, 1 * 1024 * 1024, 1 * 1024 * 1024; "equal part_size and window")]
#[test_case(250 * 1024, 1 * 1024 * 1024, 1250 * 1024; "window larger than part_size")]
fn test_seek_window_reservation(part_size: usize, seek_window_size: usize, expected: u64) {
let reservation = PrefetchGetObject::<MockClient>::seek_window_reservation(part_size, seek_window_size);
assert_eq!(reservation, expected);
}
#[test_case(8 * MB, 8 * MB, 1 * MB + 128 * KB; "default")]
#[test_case(8 * MB, 8 * MB, 0; "no initial request")]
#[test_case(1 * KB, 1 * MB, 10 * MB; "initial request larger than part size")]
#[test_case(16 * MB, 8 * MB, 1 * MB + 128 * KB; "larger intial read window")]
#[test_case(16 * MB, 8 * MB, 0; "larger intial read window w/o initial request")]
#[test_case(1 * KB, 8 * MB, 1 * MB + 128 * KB; "smaller intial read window")]
#[test_case(1 * KB, 8 * MB, 0; "smaller intial read window w/o initial request")]
fn test_initial_reqeust_size(initial_read_window_size: usize, part_size: usize, initial_request_size: usize) {
let object_size = (16 * MB) as u64;
let client = Arc::new(
MockClient::config()
.bucket("test-bucket")
.part_size(part_size)
.enable_backpressure(true)
.initial_read_window_size(initial_read_window_size)
.build(),
);
let object = MockObject::ramp(0xaa, object_size as usize, ETag::for_tests());
let etag = object.etag();
client.add_object("test-object", object);
let prefetcher_config = PrefetcherConfig {
initial_request_size,
..Default::default()
};
let prefetcher = build_prefetcher(client.clone(), PrefetcherType::Default, prefetcher_config);
let object_id = ObjectId::new("test-object".to_owned(), etag);
let fh = HandleId::new(1);
let mut request = prefetcher.prefetch("test-bucket".to_owned(), object_id, fh, object_size);
let mut next_offset = 0;
while next_offset < object_size {
let buf = block_on(request.read(next_offset, 256 * KB)).unwrap();
if buf.is_empty() {
break;
}
let buf = buf.into_bytes().unwrap();
let expected = ramp_bytes((0xaa + next_offset) as usize, buf.len());
assert_eq!(&buf[..], &expected[..]);
next_offset += buf.len() as u64;
}
assert_eq!(next_offset, object_size);
}
#[cfg(feature = "shuttle")]
mod shuttle_tests {
use super::*;
use futures::task::{FutureObj, Spawn, SpawnError};
use shuttle::future::block_on;
use shuttle::rand::Rng;
use shuttle::{check_pct, check_random};
struct ShuttleRuntime;
impl Spawn for ShuttleRuntime {
fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> {
shuttle::future::spawn(future);
Ok(())
}
}
fn sequential_read_stress_helper() {
let mut rng = shuttle::rand::thread_rng();
let object_size = rng.gen_range(1u64..1 * 1024 * 1024);
let max_read_window_size = rng.gen_range(16usize..1 * 1024 * 1024);
let sequential_prefetch_multiplier = rng.gen_range(2usize..16);
let part_size = rng.gen_range(16usize..INITIAL_REQUEST_SIZE);
let initial_request_size = rng.gen_range(0..INITIAL_REQUEST_SIZE);
let max_forward_seek_wait_distance = rng.gen_range(16u64..1 * 1024 * 1024 + 256 * 1024);
let max_backward_seek_distance = rng.gen_range(16u64..1 * 1024 * 1024 + 256 * 1024);
let client = Arc::new(
MockClient::config()
.bucket("test-bucket")
.part_size(part_size)
.enable_backpressure(true)
.initial_read_window_size(part_size)
.build(),
);
let pool = PagedPool::new_with_candidate_sizes([part_size]);
let mem_limiter = Arc::new(MemoryLimiter::new(pool, MINIMUM_MEM_LIMIT));
let object = MockObject::ramp(0xaa, object_size as usize, ETag::for_tests());
let file_etag = object.etag();
client.add_object("hello", object);
let prefetcher_config = PrefetcherConfig {
max_read_window_size,
sequential_prefetch_multiplier,
max_forward_seek_wait_distance,
max_backward_seek_distance,
initial_request_size,
};
let prefetcher =
Prefetcher::default_builder(client).build(Runtime::new(ShuttleRuntime), mem_limiter, prefetcher_config);
let object_id = ObjectId::new("hello".to_owned(), file_etag);
let fh = HandleId::new(1);
let mut request = prefetcher.prefetch("test-bucket".to_owned(), object_id, fh, object_size);
let mut next_offset = 0;
loop {
let read_size = rng.gen_range(1usize..1 * 1024 * 1024);
let buf = block_on(request.read(next_offset, read_size)).unwrap();
if buf.is_empty() {
break;
}
let buf = buf.into_bytes().unwrap();
let expected = ramp_bytes((0xaa + next_offset) as usize, buf.len());
assert_eq!(&buf[..], &expected[..buf.len()]);
next_offset += buf.len() as u64;
}
assert_eq!(next_offset, object_size);
}
#[test]
fn sequential_read_stress() {
check_random(sequential_read_stress_helper, 1000);
check_pct(sequential_read_stress_helper, 1000, 3);
}
fn random_read_stress_helper() {
let mut rng = shuttle::rand::thread_rng();
let max_read_window_size = rng.gen_range(16usize..32 * 1024);
let sequential_prefetch_multiplier = rng.gen_range(2usize..16);
let part_size = rng.gen_range(16usize..128 * 1024);
let initial_request_size = rng.gen_range(16usize..128 * 1024);
let max_forward_seek_wait_distance = rng.gen_range(16u64..192 * 1024);
let max_backward_seek_distance = rng.gen_range(16u64..192 * 1024);
let max_object_size = initial_request_size.min(max_read_window_size) * 20;
let object_size = rng.gen_range(1u64..(64 * 1024).min(max_object_size) as u64);
let client = Arc::new(
MockClient::config()
.bucket("test-bucket")
.part_size(part_size)
.enable_backpressure(true)
.initial_read_window_size(part_size)
.build(),
);
let pool = PagedPool::new_with_candidate_sizes([part_size]);
let mem_limiter = Arc::new(MemoryLimiter::new(pool, MINIMUM_MEM_LIMIT));
let object = MockObject::ramp(0xaa, object_size as usize, ETag::for_tests());
let file_etag = object.etag();
client.add_object("hello", object);
let prefetcher_config = PrefetcherConfig {
max_read_window_size,
sequential_prefetch_multiplier,
max_forward_seek_wait_distance,
max_backward_seek_distance,
initial_request_size,
};
let prefetcher =
Prefetcher::default_builder(client).build(Runtime::new(ShuttleRuntime), mem_limiter, prefetcher_config);
let object_id = ObjectId::new("hello".to_owned(), file_etag);
let fh = HandleId::new(1);
let mut request = prefetcher.prefetch("test-bucket".to_owned(), object_id, fh, object_size);
let num_reads = rng.gen_range(10usize..50);
for _ in 0..num_reads {
let offset = rng.gen_range(0u64..object_size);
let length = rng.gen_range(1usize..(object_size - offset + 1) as usize);
let expected = ramp_bytes((0xaa + offset) as usize, length);
let buf = block_on(request.read(offset, length)).unwrap();
let buf = buf.into_bytes().unwrap();
assert_eq!(buf.len(), expected.len());
if buf[..] != expected[..] {
for i in 0..buf.len() {
if buf[i] != expected[i] {
panic!(
"buffer mismatch at offset {}, saw {} expected {}",
i, buf[i], expected[i]
);
}
}
}
}
}
#[test]
fn random_read_stress() {
check_random(random_read_stress_helper, 1000);
check_pct(random_read_stress_helper, 1000, 3);
}
}
}