mountpoint_s3_fs/
prefetch.rs

1//! This module implements a prefetcher for GetObject requests.
2//!
3//! It works by relying on the CRT's flow-control window feature. The prefetcher creates a single
4//! GetObject request with entire length of the object (starting from the first read offset) and
5//! makes increasingly larger read window. We want the chunks to be large enough that they can make
6//! effective use of the CRT's fan-out parallelism across the S3 frontend, but small enough that we
7//! don't accumulate a lot of unread object data in memory or wastefully download data we'll never
8//! read. As the reader continues to make sequential reads, we increase the size of the read window
9//! up to some maximum. If the reader ever makes a non-sequential read, we abandon the prefetching
10//! and start again with a new GetObject request with minimum read window size.
11//!
12//! In more technical details, the prefetcher creates a RequestTask when receiving the first read
13//! request from the file system or after it has just been reset. The RequestTask consists of two main
14//! components.
15//! 1.  An ObjectPartStream that has a role to continuously fetch data from the sources which can be
16//!     either S3 or the cache on disk. The ObjectPartStream is spawned and run in a separate thread
17//!     from the prefetcher.
18//! 2.  A PartQueue, where we store data received from the ObjectPartStream, waiting to be read from
19//!     the prefetcher via a RequestTask function.
20//!
21//! A backpressure mechanism is needed to control how much data we want to store in the part queue at
22//! a time as we don't want to download the entire object into memory. For the client part stream, we
23//! may be able to rely on the CRT flow-control flow window to block when we don't increase the read
24//! window size, but for the caching part stream we don't have the machinery to do that yet. That's why
25//! we introduce the BackpressureController and BackpressureLimiter to help solving this.
26//!
27//! Essentially, the BackpressureController and BackpressureLimiter is a pair of sender/receiver of a
28//! channel, created at RequestTask initialization. The sender is handed to the RequestTask. Its role
29//! is to communicate with its receiver to tell "when" it is ready to receive more data. The receiver
30//! is handed to the ObjectPartStream where the stream should call a provided function "before" fetching
31//! more data from the sources and put them into the part queue. The BackpressureLimiter should be used
32//! as a mean to block ObjectPartStream thread to fetch more data.
33
34use std::fmt::Debug;
35
36use metrics::{counter, histogram};
37use mountpoint_s3_client::error::{GetObjectError, ObjectClientError};
38use mountpoint_s3_client::{ObjectClient, error_metadata::ProvideErrorMetadata};
39use thiserror::Error;
40use tracing::trace;
41
42use crate::checksums::{ChecksummedBytes, IntegrityError};
43use crate::data_cache::DataCache;
44use crate::fs::error_metadata::{ErrorMetadata, MOUNTPOINT_ERROR_CLIENT};
45use crate::mem_limiter::{BufferArea, MemoryLimiter};
46use crate::metrics::defs::{FUSE_CACHE_HIT, PREFETCH_RESET_STATE};
47use crate::object::ObjectId;
48use crate::sync::Arc;
49
50mod backpressure_controller;
51mod builder;
52mod caching_stream;
53mod part;
54mod part_queue;
55mod part_stream;
56mod seek_window;
57mod task;
58
59pub use builder::PrefetcherBuilder;
60use part::PartOperationError;
61use part_stream::{PartStream, RequestRange, RequestTaskConfig};
62use seek_window::SeekWindow;
63use task::RequestTask;
64
65// This is a weird looking number! We really want our first request size to be 1MiB,
66// which is a common IO size. But Linux's readahead will try to read an extra 128k on on
67// top of a 1MiB read, which we'd have to wait for a second request to service. Because
68// FUSE doesn't know the difference between regular reads and readahead reads, it will
69// send us a READ request for that 128k, so we'll have to block waiting for it even if
70// the application doesn't want it. This is all in the noise for sequential IO, but
71// waiting for the readahead hurts random IO. So we add 128k to the first request size
72// to avoid the latency hit of the second request.
73pub const INITIAL_REQUEST_SIZE: usize = 1024 * 1024 + 128 * 1024;
74
75#[derive(Debug, Error)]
76pub enum PrefetchReadError<E> {
77    #[error("get object request failed")]
78    GetRequestFailed {
79        source: ObjectClientError<GetObjectError, E>,
80        metadata: Box<ErrorMetadata>,
81    },
82
83    #[error("get object request returned wrong offset")]
84    GetRequestReturnedWrongOffset { offset: u64, expected_offset: u64 },
85
86    #[error("get request terminated unexpectedly")]
87    GetRequestTerminatedUnexpectedly,
88
89    #[error("integrity check failed")]
90    Integrity(#[from] IntegrityError),
91
92    #[error("part read failed")]
93    PartReadFailed(#[from] PartOperationError),
94
95    #[error("backpressure must be enabled with non-zero initial read window")]
96    BackpressurePreconditionFailed,
97
98    #[error("read window increment failed")]
99    ReadWindowIncrement,
100}
101
102impl<E: ProvideErrorMetadata + std::error::Error + Send + Sync + 'static> PrefetchReadError<E> {
103    fn get_request_failed(err: ObjectClientError<GetObjectError, E>, bucket: &str, key: &str) -> Self {
104        let metadata = ErrorMetadata {
105            client_error_meta: err.meta(),
106            error_code: Some(MOUNTPOINT_ERROR_CLIENT.to_string()),
107            s3_bucket_name: Some(bucket.to_string()),
108            s3_object_key: Some(key.to_string()),
109        };
110        let metadata = Box::new(metadata);
111        Self::GetRequestFailed { source: err, metadata }
112    }
113}
114
115#[derive(Debug, Clone, Copy)]
116pub struct PrefetcherConfig {
117    /// Maximum size of the read window
118    pub max_read_window_size: usize,
119    /// Factor to increase the request size by whenever the reader continues making sequential reads
120    pub sequential_prefetch_multiplier: usize,
121    /// The maximum amount of unavailable data the prefetcher will tolerate during a seek operation
122    /// before resetting and starting a new S3 request.
123    pub max_forward_seek_wait_distance: u64,
124    /// The maximum distance the prefetcher will seek backwards before resetting and starting a new
125    /// S3 request. We keep this much data in memory in addition to any inflight requests.
126    pub max_backward_seek_distance: u64,
127    /// Size of the initial request. This request, of size possibly smaller than part_size,
128    /// is made to lower the latency in small-random-reads usage pattern. If set to 0, initial request
129    /// is skipped.
130    pub initial_request_size: usize,
131}
132
133impl Default for PrefetcherConfig {
134    #[allow(clippy::identity_op)]
135    fn default() -> Self {
136        Self {
137            max_read_window_size: determine_max_read_size(),
138            sequential_prefetch_multiplier: 2,
139            // We want these large enough to tolerate a single out-of-order Linux readahead, which
140            // is at most 256KiB backwards and then 512KiB forwards. For forwards seeks, we're also
141            // making a guess about where the optimal cut-off point is before it would be faster to
142            // just start a new request instead.
143            max_forward_seek_wait_distance: 16 * 1024 * 1024,
144            max_backward_seek_distance: 1 * 1024 * 1024,
145            initial_request_size: INITIAL_REQUEST_SIZE,
146        }
147    }
148}
149
150/// Provide the maximum read size for the prefetcher, for which there is one prefetcher per file handle.
151///
152/// This allows a way to override the prefetch window rather than using the hardcoded default within Mountpoint.
153/// We do not recommend using the override, and it may be removed at any time.
154///
155/// This parameter may not be accurately adopted when using small values.
156/// When prefetching starts, it will fetch 1MiB + 128KiB at time of writing.
157/// This parameter will only be used when scaling up the prefetch window.
158///
159/// This unstable override is expected to be removed once adaptive prefetching based on available memory is available:
160/// https://github.com/awslabs/mountpoint-s3/issues/987
161fn determine_max_read_size() -> usize {
162    const ENV_VAR_KEY: &str = "UNSTABLE_MOUNTPOINT_MAX_PREFETCH_WINDOW_SIZE";
163    const DEFAULT_READ_WINDOW_SIZE: usize = 2 * 1024 * 1024 * 1024;
164
165    match std::env::var_os(ENV_VAR_KEY) {
166        Some(val) => match val.to_string_lossy().parse() {
167            Ok(val) => {
168                tracing::warn!(
169                    "successfully overridden prefetch read window size \
170                        with new value {val} bytes from unstable environment config",
171                );
172                val
173            }
174            Err(_) => {
175                tracing::warn!(
176                    "{ENV_VAR_KEY} did not contain a valid positive integer \
177                        for prefetch bytes, using {DEFAULT_READ_WINDOW_SIZE} bytes instead",
178                );
179                DEFAULT_READ_WINDOW_SIZE
180            }
181        },
182        None => DEFAULT_READ_WINDOW_SIZE,
183    }
184}
185
186/// A [Prefetcher] creates and manages prefetching GetObject requests to objects.
187#[derive(Debug)]
188pub struct Prefetcher<Client> {
189    part_stream: PartStream<Client>,
190    config: PrefetcherConfig,
191    mem_limiter: Arc<MemoryLimiter>,
192}
193
194impl<Client> Prefetcher<Client>
195where
196    Client: ObjectClient + Clone + Send + Sync + 'static,
197{
198    /// Creates an instance of the default [Prefetcher] builder.
199    pub fn default_builder(client: Client) -> PrefetcherBuilder<Client> {
200        PrefetcherBuilder::default_builder(client)
201    }
202
203    /// Creates an instance of a caching [Prefetcher] builder.
204    pub fn caching_builder<Cache>(cache: Cache, client: Client) -> PrefetcherBuilder<Client>
205    where
206        Cache: DataCache + Send + Sync + 'static,
207    {
208        PrefetcherBuilder::caching_builder(cache, client)
209    }
210
211    /// Create a new [Prefetcher] from the given [ObjectPartStream] instance.
212    pub fn new(part_stream: PartStream<Client>, config: PrefetcherConfig, mem_limiter: Arc<MemoryLimiter>) -> Self {
213        Self {
214            part_stream,
215            config,
216            mem_limiter,
217        }
218    }
219
220    /// Start a new prefetch request to the specified object.
221    pub fn prefetch(&self, bucket: String, object_id: ObjectId, size: u64) -> PrefetchGetObject<Client>
222    where
223        Client: ObjectClient + Clone + Send + Sync + 'static,
224    {
225        PrefetchGetObject::new(
226            self.part_stream.clone(),
227            self.config,
228            bucket,
229            object_id,
230            size,
231            self.mem_limiter.clone(),
232        )
233    }
234}
235
236/// Result of a prefetch request. Allows callers to read object data.
237#[derive(Debug)]
238pub struct PrefetchGetObject<Client>
239where
240    Client: ObjectClient + Clone + Send + Sync + 'static,
241{
242    part_stream: PartStream<Client>,
243    config: PrefetcherConfig,
244    backpressure_task: Option<RequestTask<Client>>,
245    // Invariant: the offset of the last byte in this window is always
246    // self.next_sequential_read_offset - 1.
247    backward_seek_window: SeekWindow,
248    bucket: String,
249    object_id: ObjectId,
250    // preferred part size in the prefetcher's part queue, not the object part
251    preferred_part_size: usize,
252    /// Start offset for sequential read, used for calculating contiguous read metric
253    sequential_read_start_offset: u64,
254    next_sequential_read_offset: u64,
255    next_request_offset: u64,
256    size: u64,
257    mem_limiter: Arc<MemoryLimiter>,
258}
259
260impl<Client> PrefetchGetObject<Client>
261where
262    Client: ObjectClient + Clone + Send + Sync + 'static,
263{
264    /// Create and spawn a new prefetching request for an object
265    fn new(
266        part_stream: PartStream<Client>,
267        config: PrefetcherConfig,
268        bucket: String,
269        object_id: ObjectId,
270        size: u64,
271        mem_limiter: Arc<MemoryLimiter>,
272    ) -> Self {
273        let max_backward_seek_distance = config.max_backward_seek_distance as usize;
274        // a conservative memory reservation to avoid violating the memory limit with the large number of file handles
275        // note that this reservation is done in addition to the one in [PartQueue::push_front]
276        let seek_window_reservation =
277            Self::seek_window_reservation(part_stream.client().read_part_size(), max_backward_seek_distance);
278        mem_limiter.reserve(BufferArea::Prefetch, seek_window_reservation);
279        PrefetchGetObject {
280            part_stream,
281            config,
282            backpressure_task: None,
283            backward_seek_window: SeekWindow::new(max_backward_seek_distance),
284            preferred_part_size: 128 * 1024,
285            sequential_read_start_offset: 0,
286            next_sequential_read_offset: 0,
287            next_request_offset: 0,
288            bucket,
289            object_id,
290            size,
291            mem_limiter,
292        }
293    }
294
295    /// Read some bytes from the object. This function will always return exactly `size` bytes,
296    /// except at the end of the object where it will return however many bytes are left (including
297    /// possibly 0 bytes).
298    pub async fn read(
299        &mut self,
300        offset: u64,
301        length: usize,
302    ) -> Result<ChecksummedBytes, PrefetchReadError<Client::ClientError>> {
303        trace!(
304            offset,
305            length,
306            next_seq_offset = self.next_sequential_read_offset,
307            "read"
308        );
309
310        match self.try_read(offset, length).await {
311            Ok((data, cache_hit)) => {
312                // Record cache hit metric for FUSE layer. We only record a cache hit when ALL parts
313                // for this read request were served from cache storage (disk/express), not from S3.
314                // Partial cache hits (some parts from cache, some from S3) are counted as cache misses
315                // to provide a clear binary metric. Parts served from in-memory prefetch buffers
316                // (from previous S3 requests) don't count as cache hits.
317
318                if !data.is_empty() && cache_hit {
319                    // We only increment the counter on cache_hit=true because OTLP counters don't preserve
320                    // data point counts needed for meaningful averages.
321                    // FIXME: Consider using histogram to capture partial hit ratios.
322                    metrics::counter!(FUSE_CACHE_HIT).increment(1);
323                }
324                Ok(data)
325            }
326            Err(err) => {
327                self.reset_prefetch_to_offset(offset);
328                Err(err)
329            }
330        }
331    }
332
333    async fn try_read(
334        &mut self,
335        offset: u64,
336        length: usize,
337    ) -> Result<(ChecksummedBytes, bool), PrefetchReadError<Client::ClientError>> {
338        // Currently, we set preferred part size to the current read size.
339        // Our assumption is that the read size will be the same for most sequential
340        // read and it can be aligned to the size of prefetched chunks.
341        //
342        // We initialize this value to 128k as it is the Linux's readahead size
343        // and it can also be used as a lower bound in case the read size is too small.
344        // The upper bound is 1MiB since it should be a common IO size.
345        let max_preferred_part_size = 1024 * 1024;
346        self.preferred_part_size = self.preferred_part_size.max(length).min(max_preferred_part_size);
347
348        let remaining = self.size.saturating_sub(offset);
349        if remaining == 0 {
350            return Ok((ChecksummedBytes::default(), false));
351        }
352        let mut to_read = (length as u64).min(remaining);
353
354        // Try to seek if this read is not sequential, and if seeking fails, cancel and reset the
355        // prefetcher.
356        if self.next_sequential_read_offset != offset {
357            if self.try_seek(offset).await? {
358                trace!("seek succeeded");
359            } else {
360                trace!(
361                    expected = self.next_sequential_read_offset,
362                    actual = offset,
363                    "out-of-order read, resetting prefetch"
364                );
365                counter!(PREFETCH_RESET_STATE).increment(1);
366
367                // This is an approximation, tolerating some seeking caused by concurrent readahead.
368                self.record_contiguous_read_metric();
369
370                self.reset_prefetch_to_offset(offset);
371            }
372        }
373        assert_eq!(self.next_sequential_read_offset, offset);
374
375        if self.backpressure_task.is_none() {
376            self.backpressure_task = Some(self.spawn_read_backpressure_request()?);
377        }
378
379        let mut all_parts_from_cache = true;
380        let mut response = ChecksummedBytes::default();
381        while to_read > 0 {
382            let Some(current_task) = self.backpressure_task.as_mut() else {
383                trace!(offset, length, "read beyond object size");
384                break;
385            };
386            debug_assert!(current_task.remaining() > 0);
387
388            let part = current_task.read(to_read as usize).await?;
389            all_parts_from_cache &= part.is_from_cache();
390            self.backward_seek_window.push(part.clone());
391            let part_bytes = part.into_bytes(&self.object_id, self.next_sequential_read_offset)?;
392
393            self.next_sequential_read_offset += part_bytes.len() as u64;
394            // If we can complete the read with just a single buffer, early return to avoid copying
395            // into a new buffer. This should be the common case as long as part size is larger than
396            // read size, which it almost always is for real S3 clients and FUSE.
397            if response.is_empty() && part_bytes.len() == to_read as usize {
398                return Ok((part_bytes, all_parts_from_cache));
399            }
400
401            let part_len = part_bytes.len() as u64;
402            response.extend(part_bytes)?;
403            to_read -= part_len;
404        }
405
406        Ok((response, all_parts_from_cache))
407    }
408
409    /// Spawn a backpressure GetObject request which has a range from current offset to the end of the file.
410    /// We will be using flow-control window to control how much data we want to download into the prefetcher.
411    fn spawn_read_backpressure_request(
412        &mut self,
413    ) -> Result<RequestTask<Client>, PrefetchReadError<Client::ClientError>> {
414        let start = self.next_sequential_read_offset;
415        let object_size = self.size as usize;
416        let read_part_size = self.part_stream.client().read_part_size();
417        let range = RequestRange::new(object_size, start, object_size);
418
419        // The prefetcher now relies on backpressure mechanism so it must be enabled
420        match self.part_stream.client().initial_read_window_size() {
421            Some(value) => {
422                // Also, make sure that we don't get blocked from the beginning
423                if value == 0 {
424                    return Err(PrefetchReadError::BackpressurePreconditionFailed);
425                }
426            }
427            None => return Err(PrefetchReadError::BackpressurePreconditionFailed),
428        };
429
430        let config = RequestTaskConfig {
431            bucket: self.bucket.clone(),
432            object_id: self.object_id.clone(),
433            range,
434            read_part_size,
435            preferred_part_size: self.preferred_part_size,
436            initial_request_size: self.config.initial_request_size,
437            max_read_window_size: self.config.max_read_window_size,
438            read_window_size_multiplier: self.config.sequential_prefetch_multiplier,
439        };
440        Ok(self.part_stream.spawn_get_object_request(config))
441    }
442
443    /// Reset this prefetch request to a new offset, clearing any existing tasks queued.
444    fn reset_prefetch_to_offset(&mut self, offset: u64) {
445        self.backpressure_task = None;
446        self.backward_seek_window.clear();
447        self.sequential_read_start_offset = offset;
448        self.next_sequential_read_offset = offset;
449        self.next_request_offset = offset;
450    }
451
452    /// Try to seek within the current inflight requests without restarting them. Returns true if
453    /// the seek succeeded, in which case self.next_sequential_read_offset will be updated to the
454    /// new offset. If this returns false, the prefetcher is in an unknown state and must be reset.
455    async fn try_seek(&mut self, offset: u64) -> Result<bool, PrefetchReadError<Client::ClientError>> {
456        assert_ne!(offset, self.next_sequential_read_offset);
457        trace!(from = self.next_sequential_read_offset, to = offset, "trying to seek");
458        if offset > self.next_sequential_read_offset {
459            self.try_seek_forward(offset).await
460        } else {
461            self.try_seek_backward(offset).await
462        }
463    }
464
465    async fn try_seek_forward(&mut self, offset: u64) -> Result<bool, PrefetchReadError<Client::ClientError>> {
466        assert!(offset > self.next_sequential_read_offset);
467        let total_seek_distance = offset - self.next_sequential_read_offset;
468        histogram!("prefetch.seek_distance", "dir" => "forward").record(total_seek_distance as f64);
469
470        let Some(task) = self.backpressure_task.as_mut() else {
471            // Can't seek if there's no requests in flight at all
472            return Ok(false);
473        };
474
475        // Not enough data in the read window to serve the forward seek
476        if offset >= task.read_window_end_offset() {
477            return Ok(false);
478        }
479
480        // If we have enough bytes already downloaded (`available`) to skip straight to this read, then do
481        // it. Otherwise, we're willing to wait for the bytes to download only if they're coming "soon", where
482        // soon is defined as up to `max_forward_seek_wait_distance` bytes ahead of the available offset.
483        let available_offset = task.available_offset();
484        let available_soon_offset = available_offset.saturating_add(self.config.max_forward_seek_wait_distance);
485        if offset >= available_soon_offset {
486            trace!(
487                requested_offset = offset,
488                available_offset = available_offset,
489                "seek failed: not enough data available"
490            );
491            return Ok(false);
492        }
493        let mut seek_distance = offset - self.next_sequential_read_offset;
494        while seek_distance > 0 {
495            let part = task.read(seek_distance as usize).await?;
496            seek_distance -= part.len() as u64;
497            self.next_sequential_read_offset += part.len() as u64;
498            self.backward_seek_window.push(part);
499        }
500        Ok(true)
501    }
502
503    async fn try_seek_backward(&mut self, offset: u64) -> Result<bool, PrefetchReadError<Client::ClientError>> {
504        assert!(offset < self.next_sequential_read_offset);
505
506        // When the task is None it means either we have just started prefetching or recently reset it,
507        // in both cases the backward seek window would be empty so we can bail out early.
508        let Some(task) = self.backpressure_task.as_mut() else {
509            return Ok(false);
510        };
511        let backwards_length_needed = self.next_sequential_read_offset - offset;
512        histogram!("prefetch.seek_distance", "dir" => "backward").record(backwards_length_needed as f64);
513
514        let Some(parts) = self.backward_seek_window.read_back(backwards_length_needed as usize) else {
515            trace!("seek failed: not enough data in backwards seek window");
516            return Ok(false);
517        };
518        // This also increase `prefetcher_mem_reserved` value in memory limiter.
519        // At least one subsequent `RequestTask::read` is required for memory tracking to work correctly
520        // because `BackpressureController::drop` needs to know the start offset of the part queue to
521        // release the right amount of memory.
522        task.push_front(parts).await?;
523        self.next_sequential_read_offset = offset;
524        Ok(true)
525    }
526
527    /// Record the end of a contiguous read.
528    ///
529    /// This should be invoked at the end of each set of contiguous reads, including if no further read occurs.
530    fn record_contiguous_read_metric(&self) {
531        histogram!("prefetch.contiguous_read_len")
532            .record((self.next_sequential_read_offset - self.sequential_read_start_offset) as f64);
533    }
534
535    /// The amount of memory reserved for a backwards seek window.
536    ///
537    /// The seek window size is rounded up to the nearest multiple of part_size.
538    fn seek_window_reservation(part_size: usize, seek_window_size: usize) -> u64 {
539        (seek_window_size.div_ceil(part_size) * part_size) as u64
540    }
541}
542
543impl<Client> Drop for PrefetchGetObject<Client>
544where
545    Client: ObjectClient + Clone + Send + Sync + 'static,
546{
547    fn drop(&mut self) {
548        let seek_window_reservation = Self::seek_window_reservation(
549            self.part_stream.client().read_part_size(),
550            self.backward_seek_window.max_size(),
551        );
552        self.mem_limiter.release(BufferArea::Prefetch, seek_window_reservation);
553        self.record_contiguous_read_metric();
554    }
555}
556
557#[cfg(test)]
558mod tests {
559    // It's convenient to write test constants like "1 * 1024 * 1024" for symmetry
560    #![allow(clippy::identity_op)]
561
562    use crate::Runtime;
563    use crate::data_cache::InMemoryDataCache;
564    use crate::mem_limiter::{MINIMUM_MEM_LIMIT, MemoryLimiter};
565    use crate::memory::PagedPool;
566    use crate::sync::Arc;
567
568    use super::*;
569    use futures::executor::{ThreadPool, block_on};
570    use mountpoint_s3_client::failure_client::{
571        CountdownFailureConfig, GetObjectFailureMode, countdown_failure_client,
572    };
573    use mountpoint_s3_client::mock_client::{MockClient, MockClientConfig, MockClientError, MockObject, ramp_bytes};
574    use mountpoint_s3_client::types::ETag;
575    use proptest::proptest;
576    use proptest::strategy::{Just, Strategy};
577    use proptest_derive::Arbitrary;
578    use std::collections::HashMap;
579    use test_case::test_case;
580
581    const KB: usize = 1024;
582    const MB: usize = 1024 * 1024;
583
584    #[derive(Debug, Arbitrary)]
585    struct TestConfig {
586        #[proptest(strategy = "16usize..1*1024*1024")]
587        initial_request_size: usize,
588        #[proptest(strategy = "16usize..1*1024*1024")]
589        max_read_window_size: usize,
590        #[proptest(strategy = "1usize..8usize")]
591        sequential_prefetch_multiplier: usize,
592        #[proptest(strategy = "16usize..2*1024*1024")]
593        client_part_size: usize,
594        #[proptest(strategy = "1u64..4*1024*1024")]
595        max_forward_seek_wait_distance: u64,
596        #[proptest(strategy = "1u64..4*1024*1024")]
597        max_backward_seek_distance: u64,
598        #[proptest(strategy = "16usize..1*1024*1024")]
599        cache_block_size: usize,
600    }
601
602    enum PrefetcherType {
603        Default,
604        InMemoryCache(usize),
605    }
606
607    fn build_prefetcher<Client>(
608        client: Client,
609        prefetcher_type: PrefetcherType,
610        prefetcher_config: PrefetcherConfig,
611    ) -> Prefetcher<Client>
612    where
613        Client: ObjectClient + Clone + Send + Sync + 'static,
614    {
615        let pool = PagedPool::new_with_candidate_sizes([client.read_part_size(), client.write_part_size()]);
616        let mem_limiter = Arc::new(MemoryLimiter::new(pool, MINIMUM_MEM_LIMIT));
617        let runtime = Runtime::new(ThreadPool::builder().pool_size(1).create().unwrap());
618        let builder = match prefetcher_type {
619            PrefetcherType::Default => Prefetcher::default_builder(client),
620            PrefetcherType::InMemoryCache(block_size) => {
621                let cache = InMemoryDataCache::new(block_size as u64);
622                Prefetcher::caching_builder(cache, client)
623            }
624        };
625        builder.build(runtime, mem_limiter, prefetcher_config)
626    }
627
628    fn run_sequential_read_test(prefetcher_type: PrefetcherType, size: u64, read_size: usize, test_config: TestConfig) {
629        let client = Arc::new(
630            MockClient::config()
631                .bucket("test-bucket")
632                .part_size(test_config.client_part_size)
633                .enable_backpressure(true)
634                .initial_read_window_size(test_config.client_part_size)
635                .build(),
636        );
637        let object = MockObject::ramp(0xaa, size as usize, ETag::for_tests());
638        let etag = object.etag();
639
640        client.add_object("hello", object);
641
642        let prefetcher_config = PrefetcherConfig {
643            max_read_window_size: test_config.max_read_window_size,
644            sequential_prefetch_multiplier: test_config.sequential_prefetch_multiplier,
645            max_forward_seek_wait_distance: test_config.max_forward_seek_wait_distance,
646            max_backward_seek_distance: test_config.max_backward_seek_distance,
647            initial_request_size: test_config.initial_request_size,
648        };
649
650        let prefetcher = build_prefetcher(client.clone(), prefetcher_type, prefetcher_config);
651        let object_id = ObjectId::new("hello".to_owned(), etag);
652        let mut request = prefetcher.prefetch("test-bucket".to_owned(), object_id, size);
653
654        let mut next_offset = 0;
655        loop {
656            let buf = block_on(request.read(next_offset, read_size)).unwrap();
657            if buf.is_empty() {
658                break;
659            }
660            let buf = buf.into_bytes().unwrap();
661            let expected = ramp_bytes((0xaa + next_offset) as usize, buf.len());
662            assert_eq!(&buf[..], &expected[..buf.len()]);
663            next_offset += buf.len() as u64;
664        }
665        assert_eq!(next_offset, size);
666    }
667
668    #[test_case(PrefetcherType::Default)]
669    #[test_case(PrefetcherType::InMemoryCache(1 * MB))]
670    fn sequential_read_small(prefetcher_type: PrefetcherType) {
671        let config = TestConfig {
672            initial_request_size: 256 * 1024,
673            max_read_window_size: 1024 * 1024 * 1024,
674            sequential_prefetch_multiplier: 8,
675            client_part_size: 8 * 1024 * 1024,
676            max_forward_seek_wait_distance: 16 * 1024 * 1024,
677            max_backward_seek_distance: 2 * 1024 * 1024,
678            cache_block_size: 1 * MB,
679        };
680        run_sequential_read_test(prefetcher_type, 1024 * 1024 + 111, 1024 * 1024, config);
681    }
682
683    #[test_case(PrefetcherType::Default)]
684    #[test_case(PrefetcherType::InMemoryCache(1 * MB))]
685    fn sequential_read_medium(prefetcher_type: PrefetcherType) {
686        let config = TestConfig {
687            initial_request_size: 256 * 1024,
688            max_read_window_size: 64 * 1024 * 1024,
689            sequential_prefetch_multiplier: 8,
690            client_part_size: 8 * 1024 * 1024,
691            max_forward_seek_wait_distance: 16 * 1024 * 1024,
692            max_backward_seek_distance: 2 * 1024 * 1024,
693            cache_block_size: 1 * MB,
694        };
695        run_sequential_read_test(prefetcher_type, 16 * 1024 * 1024 + 111, 1024 * 1024, config);
696    }
697
698    #[test_case(PrefetcherType::Default)]
699    #[test_case(PrefetcherType::InMemoryCache(1 * MB))]
700    fn sequential_read_large(prefetcher_type: PrefetcherType) {
701        let config = TestConfig {
702            initial_request_size: 256 * 1024,
703            max_read_window_size: 64 * 1024 * 1024,
704            sequential_prefetch_multiplier: 8,
705            client_part_size: 8 * 1024 * 1024,
706            max_forward_seek_wait_distance: 16 * 1024 * 1024,
707            max_backward_seek_distance: 2 * 1024 * 1024,
708            cache_block_size: 1 * MB,
709        };
710
711        run_sequential_read_test(prefetcher_type, 256 * 1024 * 1024 + 111, 1024 * 1024, config);
712    }
713
714    fn fail_with_backpressure_precondition_test(
715        prefetcher_type: PrefetcherType,
716        test_config: TestConfig,
717        client_config: MockClientConfig,
718    ) {
719        let client = Arc::new(MockClient::new(client_config));
720        let read_size = 1 * MB;
721        let object_size = 8 * MB;
722        let object = MockObject::ramp(0xaa, object_size, ETag::for_tests());
723        let etag = object.etag();
724
725        let prefetcher_config = PrefetcherConfig {
726            max_read_window_size: test_config.max_read_window_size,
727            sequential_prefetch_multiplier: test_config.sequential_prefetch_multiplier,
728            ..Default::default()
729        };
730
731        let prefetcher = build_prefetcher(client, prefetcher_type, prefetcher_config);
732        let object_id = ObjectId::new("hello".to_owned(), etag);
733        let mut request = prefetcher.prefetch("test-bucket".to_owned(), object_id, object_size as u64);
734        let result = block_on(request.read(0, read_size));
735        assert!(matches!(result, Err(PrefetchReadError::BackpressurePreconditionFailed)));
736    }
737
738    #[test_case(PrefetcherType::Default)]
739    #[test_case(PrefetcherType::InMemoryCache(1 * MB))]
740    fn fail_with_backpressure_not_enabled(prefetcher_type: PrefetcherType) {
741        let test_config = TestConfig {
742            initial_request_size: 256 * 1024,
743            max_read_window_size: 1024 * 1024 * 1024,
744            sequential_prefetch_multiplier: 8,
745            client_part_size: 8 * 1024 * 1024,
746            max_forward_seek_wait_distance: 16 * 1024 * 1024,
747            max_backward_seek_distance: 2 * 1024 * 1024,
748            cache_block_size: 1 * MB,
749        };
750
751        // backpressure is not enabled for the client
752        let config = MockClient::config()
753            .bucket("test-bucket")
754            .part_size(test_config.client_part_size)
755            .enable_backpressure(false);
756
757        fail_with_backpressure_precondition_test(prefetcher_type, test_config, config);
758    }
759
760    #[test_case(PrefetcherType::Default)]
761    #[test_case(PrefetcherType::InMemoryCache(1 * MB))]
762    fn fail_with_backpressure_zero_read_window(prefetcher_type: PrefetcherType) {
763        let test_config = TestConfig {
764            initial_request_size: 256 * 1024,
765            max_read_window_size: 1024 * 1024 * 1024,
766            sequential_prefetch_multiplier: 8,
767            client_part_size: 8 * 1024 * 1024,
768            max_forward_seek_wait_distance: 16 * 1024 * 1024,
769            max_backward_seek_distance: 2 * 1024 * 1024,
770            cache_block_size: 1 * MB,
771        };
772
773        // backpressure is enabled but initial read window size is zero
774        let config = MockClient::config()
775            .bucket("test-bucket")
776            .part_size(test_config.client_part_size)
777            .enable_backpressure(true)
778            .initial_read_window_size(0);
779
780        fail_with_backpressure_precondition_test(prefetcher_type, test_config, config);
781    }
782
783    fn fail_sequential_read_test(
784        prefetcher_type: PrefetcherType,
785        size: u64,
786        read_size: usize,
787        test_config: TestConfig,
788        get_failures: HashMap<usize, GetObjectFailureMode<MockClientError>>,
789    ) {
790        let client = MockClient::config()
791            .bucket("test-bucket")
792            .part_size(test_config.client_part_size)
793            .enable_backpressure(true)
794            .initial_read_window_size(test_config.client_part_size)
795            .build();
796        let object = MockObject::ramp(0xaa, size as usize, ETag::for_tests());
797        let etag = object.etag();
798
799        client.add_object("hello", object);
800
801        let client = Arc::new(countdown_failure_client(
802            client,
803            CountdownFailureConfig {
804                get_failures,
805                ..Default::default()
806            },
807        ));
808
809        let prefetcher_config = PrefetcherConfig {
810            max_read_window_size: test_config.max_read_window_size,
811            sequential_prefetch_multiplier: test_config.sequential_prefetch_multiplier,
812            initial_request_size: test_config.initial_request_size,
813            ..Default::default()
814        };
815
816        let prefetcher = build_prefetcher(client, prefetcher_type, prefetcher_config);
817        let object_id = ObjectId::new("hello".to_owned(), etag);
818        let mut request = prefetcher.prefetch("test-bucket".to_owned(), object_id, size);
819
820        let mut next_offset = 0;
821        loop {
822            let buf = match block_on(request.read(next_offset, read_size)) {
823                Ok(buf) => buf,
824                Err(_) => break,
825            };
826            let buf = buf.into_bytes().unwrap();
827
828            if buf.is_empty() {
829                break;
830            }
831            let expected = ramp_bytes((0xaa + next_offset) as usize, buf.len());
832            assert_eq!(&buf[..], &expected[..buf.len()]);
833            next_offset += buf.len() as u64;
834        }
835        assert!(next_offset < size); // Since we're injecting failures, shouldn't make it to the end
836    }
837
838    #[test_case("invalid range; length=42", PrefetcherType::Default)]
839    #[test_case("invalid range; length=42", PrefetcherType::InMemoryCache(1 * MB))]
840    // test case for the request failure due to etag not matching
841    #[test_case(
842        "At least one of the pre-conditions you specified did not hold",
843        PrefetcherType::Default
844    )]
845    #[test_case("At least one of the pre-conditions you specified did not hold", PrefetcherType::InMemoryCache(1 * MB))]
846    fn fail_request_sequential_small(err_value: &str, prefetcher_type: PrefetcherType) {
847        let config = TestConfig {
848            initial_request_size: 256 * 1024,
849            max_read_window_size: 1024 * 1024 * 1024,
850            sequential_prefetch_multiplier: 8,
851            client_part_size: 8 * 1024 * 1024,
852            max_forward_seek_wait_distance: 16 * 1024 * 1024,
853            max_backward_seek_distance: 2 * 1024 * 1024,
854            cache_block_size: 1 * MB,
855        };
856
857        let mut get_failures = HashMap::new();
858        get_failures.insert(
859            2,
860            GetObjectFailureMode::OperationError(ObjectClientError::ClientError(MockClientError(
861                err_value.to_owned().into(),
862            ))),
863        );
864
865        fail_sequential_read_test(prefetcher_type, 1024 * 1024 + 111, 1024 * 1024, config, get_failures);
866    }
867
868    proptest! {
869        #[test]
870        fn proptest_sequential_read(
871            size in 1u64..1 * 1024 * 1024,
872            read_size in 1usize..1 * 1024 * 1024,
873            config: TestConfig,
874        ) {
875            run_sequential_read_test(PrefetcherType::Default, size, read_size, config);
876        }
877
878        #[test]
879        fn proptest_sequential_read_small_read_size(size in 1u64..1 * 1024 * 1024, read_factor in 1usize..10, config: TestConfig) {
880            // Pick read size smaller than the object size
881            let read_size = (size as usize / read_factor).max(1);
882            run_sequential_read_test(PrefetcherType::Default, size, read_size, config);
883        }
884
885        #[test]
886        fn proptest_sequential_read_with_cache(
887            size in 1u64..1 * 1024 * 1024,
888            read_size in 1usize..1 * 1024 * 1024,
889            config: TestConfig,
890        ) {
891            run_sequential_read_test(PrefetcherType::InMemoryCache(config.cache_block_size), size, read_size, config);
892        }
893
894        #[test]
895        fn proptest_sequential_read_small_read_size_with_cache(size in 1u64..1 * 1024 * 1024, read_factor in 1usize..10,
896            config: TestConfig) {
897            // Pick read size smaller than the object size
898            let read_size = (size as usize / read_factor).max(1);
899            run_sequential_read_test(PrefetcherType::InMemoryCache(config.cache_block_size), size, read_size, config);
900        }
901    }
902
903    #[test]
904    fn test_sequential_read_regression() {
905        let object_size = 854966;
906        let read_size = 161647;
907        let config = TestConfig {
908            initial_request_size: 484941,
909            max_read_window_size: 81509,
910            sequential_prefetch_multiplier: 1,
911            client_part_size: 181682,
912            max_forward_seek_wait_distance: 1,
913            max_backward_seek_distance: 18668,
914            cache_block_size: 1 * MB,
915        };
916        run_sequential_read_test(PrefetcherType::Default, object_size, read_size, config);
917    }
918
919    fn run_random_read_test(
920        prefetcher_type: PrefetcherType,
921        object_size: u64,
922        reads: Vec<(u64, usize)>,
923        test_config: TestConfig,
924    ) {
925        let client = Arc::new(
926            MockClient::config()
927                .bucket("test-bucket")
928                .part_size(test_config.client_part_size)
929                .enable_backpressure(true)
930                .initial_read_window_size(test_config.client_part_size)
931                .build(),
932        );
933        let object = MockObject::ramp(0xaa, object_size as usize, ETag::for_tests());
934        let etag = object.etag();
935
936        client.add_object("hello", object);
937
938        let prefetcher_config = PrefetcherConfig {
939            max_read_window_size: test_config.max_read_window_size,
940            sequential_prefetch_multiplier: test_config.sequential_prefetch_multiplier,
941            max_forward_seek_wait_distance: test_config.max_forward_seek_wait_distance,
942            max_backward_seek_distance: test_config.max_backward_seek_distance,
943            initial_request_size: test_config.initial_request_size,
944        };
945
946        let prefetcher = build_prefetcher(client, prefetcher_type, prefetcher_config);
947        let object_id = ObjectId::new("hello".to_owned(), etag);
948        let mut request = prefetcher.prefetch("test-bucket".to_owned(), object_id, object_size);
949
950        for (offset, length) in reads {
951            assert!(offset < object_size);
952            assert!(offset + length as u64 <= object_size);
953            let expected = ramp_bytes((0xaa + offset) as usize, length);
954            let buf = block_on(request.read(offset, length)).unwrap();
955            let buf = buf.into_bytes().unwrap();
956            assert_eq!(buf.len(), expected.len());
957            // Don't spew the giant buffer if this test fails
958            if buf[..] != expected[..] {
959                for i in 0..buf.len() {
960                    if buf[i] != expected[i] {
961                        panic!(
962                            "buffer mismatch at offset {}, saw {} expected {}",
963                            i, buf[i], expected[i]
964                        );
965                    }
966                }
967            }
968        }
969    }
970
971    fn random_read_strategy(max_object_size: u64) -> impl Strategy<Value = (u64, Vec<(u64, usize)>)> {
972        (1..=max_object_size).prop_flat_map(|object_size| {
973            (
974                Just(object_size),
975                proptest::collection::vec(
976                    (0..object_size).prop_flat_map(move |offset| {
977                        (1..=object_size - offset).prop_map(move |length| (offset, length as usize))
978                    }),
979                    0..10,
980                ),
981            )
982        })
983    }
984
985    proptest! {
986        #[test]
987        fn proptest_random_read(
988            reads in random_read_strategy(1 * 1024 * 1024),
989            config: TestConfig,
990        ) {
991            let (object_size, reads) = reads;
992            run_random_read_test(PrefetcherType::Default, object_size, reads, config);
993        }
994
995        #[test]
996        fn proptest_random_read_with_cache(
997            reads in random_read_strategy(1 * 1024 * 1024),
998            config: TestConfig,
999        ) {
1000            let (object_size, reads) = reads;
1001            run_random_read_test(PrefetcherType::InMemoryCache(config.cache_block_size), object_size, reads, config);
1002        }
1003    }
1004
1005    #[test]
1006    fn test_random_read_regression() {
1007        let object_size = 724314;
1008        let reads = vec![(0, 516883)];
1009        let config = TestConfig {
1010            initial_request_size: 3684779,
1011            max_read_window_size: 2147621,
1012            sequential_prefetch_multiplier: 4,
1013            client_part_size: 516882,
1014            max_forward_seek_wait_distance: 16 * 1024 * 1024,
1015            max_backward_seek_distance: 2 * 1024 * 1024,
1016            cache_block_size: 1 * MB,
1017        };
1018        run_random_read_test(PrefetcherType::Default, object_size, reads, config);
1019    }
1020
1021    #[test]
1022    fn test_random_read_regression2() {
1023        let object_size = 755678;
1024        let reads = vec![(0, 278499), (311250, 1)];
1025        let config = TestConfig {
1026            initial_request_size: 556997,
1027            max_read_window_size: 105938,
1028            sequential_prefetch_multiplier: 7,
1029            client_part_size: 1219731,
1030            max_forward_seek_wait_distance: 16 * 1024 * 1024,
1031            max_backward_seek_distance: 2 * 1024 * 1024,
1032            cache_block_size: 1 * MB,
1033        };
1034        run_random_read_test(PrefetcherType::Default, object_size, reads, config);
1035    }
1036
1037    #[test]
1038    fn test_random_read_regression3() {
1039        let object_size = 755678;
1040        let reads = vec![(0, 236766), (291204, 1), (280930, 36002)];
1041        let config = TestConfig {
1042            initial_request_size: 556997,
1043            max_read_window_size: 105938,
1044            sequential_prefetch_multiplier: 7,
1045            client_part_size: 1219731,
1046            max_forward_seek_wait_distance: 2260662,
1047            max_backward_seek_distance: 2369799,
1048            cache_block_size: 1 * MB,
1049        };
1050        run_random_read_test(PrefetcherType::Default, object_size, reads, config);
1051    }
1052
1053    #[test]
1054    fn test_random_read_regression4() {
1055        let object_size = 14201;
1056        let reads = vec![(3584, 1), (9424, 1460), (3582, 3340), (248, 9218)];
1057        let config = TestConfig {
1058            initial_request_size: 457999,
1059            max_read_window_size: 863511,
1060            sequential_prefetch_multiplier: 5,
1061            client_part_size: 1972409,
1062            max_forward_seek_wait_distance: 2810651,
1063            max_backward_seek_distance: 3531090,
1064            cache_block_size: 1 * MB,
1065        };
1066        run_random_read_test(PrefetcherType::Default, object_size, reads, config);
1067    }
1068
1069    #[test]
1070    fn test_forward_seek_failure() {
1071        const PART_SIZE: usize = 8192;
1072        const OBJECT_SIZE: usize = 2 * PART_SIZE;
1073
1074        let client = MockClient::config()
1075            .bucket("test-bucket")
1076            .part_size(PART_SIZE)
1077            .enable_backpressure(true)
1078            .initial_read_window_size(OBJECT_SIZE)
1079            .build();
1080        let object = MockObject::ramp(0xaa, OBJECT_SIZE, ETag::for_tests());
1081        let etag = object.etag();
1082        client.add_object("hello", object);
1083
1084        let mut get_failures = HashMap::new();
1085        get_failures.insert(
1086            1,
1087            GetObjectFailureMode::StreamPositionError(
1088                2,
1089                ObjectClientError::ClientError(MockClientError(
1090                    "error in the second chunk of the first request".into(),
1091                )),
1092            ),
1093        );
1094        get_failures.insert(
1095            2,
1096            GetObjectFailureMode::OperationError(ObjectClientError::ClientError(MockClientError(
1097                "error in second request".into(),
1098            ))),
1099        );
1100
1101        let client = Arc::new(countdown_failure_client(
1102            client,
1103            CountdownFailureConfig {
1104                get_failures,
1105                ..Default::default()
1106            },
1107        ));
1108        let prefetcher = build_prefetcher(client, PrefetcherType::Default, Default::default());
1109        block_on(async {
1110            let object_id = ObjectId::new("hello".to_owned(), etag.clone());
1111            let mut request = prefetcher.prefetch("test-bucket".to_owned(), object_id, OBJECT_SIZE as u64);
1112
1113            // The first read should trigger the prefetcher to try and get the whole object (in 2 parts).
1114            _ = request.read(0, 1).await.expect("first read should succeed");
1115
1116            // Seek to the second part (where we injected a failure).
1117            let offset = PART_SIZE + 1;
1118            _ = request.read(offset as u64, 1).await.expect_err("seek should fail");
1119
1120            // A retry should trigger a new request (also failing).
1121            _ = request
1122                .read(offset as u64, 1)
1123                .await
1124                .expect_err("first retry after failure should fail");
1125
1126            // New retry should succeed (no more failures injected).
1127            let byte = request
1128                .read(offset as u64, 1)
1129                .await
1130                .expect("second retry should succeed");
1131            let expected = ramp_bytes(0xaa + offset, 1);
1132            assert_eq!(byte.into_bytes().unwrap()[..], expected[..]);
1133        });
1134    }
1135
1136    #[test_case(PrefetcherType::Default)]
1137    #[test_case(PrefetcherType::InMemoryCache(8192))]
1138    fn test_short_read_failure(prefetcher_type: PrefetcherType) {
1139        const PART_SIZE: usize = 8192;
1140        const OBJECT_SIZE: usize = 2 * PART_SIZE;
1141
1142        let client = MockClient::config()
1143            .bucket("test-bucket")
1144            .part_size(PART_SIZE)
1145            .enable_backpressure(true)
1146            .initial_read_window_size(PART_SIZE)
1147            .build();
1148        let object = MockObject::ramp(0xaa, OBJECT_SIZE, ETag::for_tests());
1149        let etag = object.etag();
1150        client.add_object("hello", object);
1151
1152        let mut get_failures = HashMap::new();
1153        // On first request, terminate the stream without producing any data
1154        get_failures.insert(1, GetObjectFailureMode::StreamShortCircuit(1));
1155        // On third request (second request of second prefetcher),
1156        // terminate the stream early without producing all the requested data
1157        get_failures.insert(3, GetObjectFailureMode::StreamShortCircuit(1));
1158
1159        let client = Arc::new(countdown_failure_client(
1160            client,
1161            CountdownFailureConfig {
1162                get_failures,
1163                ..Default::default()
1164            },
1165        ));
1166        let prefetcher_config = PrefetcherConfig {
1167            initial_request_size: PART_SIZE,
1168            ..Default::default()
1169        };
1170        let prefetcher = build_prefetcher(client, prefetcher_type, prefetcher_config);
1171
1172        block_on(async {
1173            let object_id = ObjectId::new("hello".to_owned(), etag.clone());
1174            let mut request = prefetcher.prefetch("test-bucket".to_owned(), object_id, OBJECT_SIZE as u64);
1175
1176            // First read will terminate early
1177            assert!(matches!(
1178                request.read(0, 10).await.expect_err("read should fail"),
1179                PrefetchReadError::GetRequestTerminatedUnexpectedly,
1180            ));
1181
1182            // Second read will return first part, but then terminate early before returning the remaining parts
1183            let bytes = request.read(0, PART_SIZE).await.unwrap();
1184            let expected = ramp_bytes(0xaa, PART_SIZE);
1185            assert_eq!(bytes.into_bytes().unwrap()[..], expected[..]);
1186            _ = request
1187                .read(PART_SIZE as u64, PART_SIZE)
1188                .await
1189                .expect_err("read should fail");
1190
1191            // There are no more failures injected, since the prefetcher will reset on failure, now we should be able to read the whole data.
1192            let bytes = request.read(0, OBJECT_SIZE).await.unwrap();
1193            let expected = ramp_bytes(0xaa, OBJECT_SIZE);
1194            assert_eq!(bytes.into_bytes().unwrap()[..], expected[..]);
1195
1196            // Shouldn't fail if the short read is due to object size not due to the stream terminating early
1197            let bytes = request.read(PART_SIZE as u64, OBJECT_SIZE).await.unwrap();
1198            let expected = ramp_bytes(0xaa + PART_SIZE, PART_SIZE);
1199            assert_eq!(bytes.into_bytes().unwrap()[..], expected[..]);
1200        });
1201    }
1202
1203    #[test_case(0, 25; "no first read")]
1204    #[test_case(60, 25; "read beyond first part")]
1205    #[test_case(20, 25; "read in first part")]
1206    #[test_case(125, 110; "read in second request")]
1207    fn test_forward_seek(first_read_size: usize, part_size: usize) {
1208        const OBJECT_SIZE: usize = 200;
1209        const FIRST_REQUEST_SIZE: usize = 100;
1210
1211        let client = Arc::new(
1212            MockClient::config()
1213                .bucket("test-bucket")
1214                .part_size(part_size)
1215                .enable_backpressure(true)
1216                .initial_read_window_size(FIRST_REQUEST_SIZE)
1217                .build(),
1218        );
1219        let object = MockObject::ramp(0xaa, OBJECT_SIZE, ETag::for_tests());
1220        let etag = object.etag();
1221
1222        client.add_object("hello", object);
1223
1224        let prefetcher = build_prefetcher(client, PrefetcherType::Default, Default::default());
1225
1226        // Try every possible seek from first_read_size
1227        for offset in first_read_size + 1..OBJECT_SIZE {
1228            let object_id = ObjectId::new("hello".to_owned(), etag.clone());
1229            let mut request = prefetcher.prefetch("test-bucket".to_owned(), object_id, OBJECT_SIZE as u64);
1230            if first_read_size > 0 {
1231                let _first_read = block_on(request.read(0, first_read_size)).unwrap();
1232            }
1233
1234            let byte = block_on(request.read(offset as u64, 1)).unwrap();
1235            let expected = ramp_bytes(0xaa + offset, 1);
1236            assert_eq!(byte.into_bytes().unwrap()[..], expected[..]);
1237        }
1238    }
1239
1240    #[test_case(60, 25; "read beyond first part")]
1241    #[test_case(20, 25; "read in first part")]
1242    #[test_case(125, 110; "read in second request")]
1243    fn test_backward_seek(first_read_size: usize, part_size: usize) {
1244        const OBJECT_SIZE: usize = 200;
1245
1246        let client = Arc::new(
1247            MockClient::config()
1248                .bucket("test-bucket")
1249                .part_size(part_size)
1250                .enable_backpressure(true)
1251                .initial_read_window_size(part_size)
1252                .build(),
1253        );
1254        let object = MockObject::ramp(0xaa, OBJECT_SIZE, ETag::for_tests());
1255        let etag = object.etag();
1256
1257        client.add_object("hello", object);
1258
1259        let prefetcher = build_prefetcher(client, PrefetcherType::Default, Default::default());
1260
1261        // Try every possible seek from first_read_size
1262        for offset in 0..first_read_size {
1263            let object_id = ObjectId::new("hello".to_owned(), etag.clone());
1264            let mut request = prefetcher.prefetch("test-bucket".to_owned(), object_id, OBJECT_SIZE as u64);
1265            if first_read_size > 0 {
1266                let _first_read = block_on(request.read(0, first_read_size)).unwrap();
1267            }
1268
1269            let byte = block_on(request.read(offset as u64, 1)).unwrap();
1270            let expected = ramp_bytes(0xaa + offset, 1);
1271            assert_eq!(byte.into_bytes().unwrap()[..], expected[..]);
1272        }
1273    }
1274
1275    #[test_case(8 * 1024 * 1024, 1 * 1024 * 1024, 8 * 1024 * 1024; "8MiB part_size, 1MiB window")]
1276    #[test_case(1 * 1024 * 1024, 1 * 1024 * 1024, 1 * 1024 * 1024; "equal part_size and window")]
1277    #[test_case(250 * 1024, 1 * 1024 * 1024, 1250 * 1024; "window larger than part_size")]
1278    fn test_seek_window_reservation(part_size: usize, seek_window_size: usize, expected: u64) {
1279        let reservation = PrefetchGetObject::<MockClient>::seek_window_reservation(part_size, seek_window_size);
1280        assert_eq!(reservation, expected);
1281    }
1282
1283    #[test_case(8 * MB, 8 * MB, 1 * MB + 128 * KB; "default")]
1284    #[test_case(8 * MB, 8 * MB, 0; "no initial request")]
1285    #[test_case(1 * KB, 1 * MB, 10 * MB; "initial request larger than part size")]
1286    #[test_case(16 * MB, 8 * MB, 1 * MB + 128 * KB; "larger intial read window")]
1287    #[test_case(16 * MB, 8 * MB, 0; "larger intial read window w/o initial request")]
1288    #[test_case(1 * KB, 8 * MB, 1 * MB + 128 * KB; "smaller intial read window")]
1289    #[test_case(1 * KB, 8 * MB, 0; "smaller intial read window w/o initial request")]
1290    fn test_initial_reqeust_size(initial_read_window_size: usize, part_size: usize, initial_request_size: usize) {
1291        let object_size = (16 * MB) as u64;
1292
1293        let client = Arc::new(
1294            MockClient::config()
1295                .bucket("test-bucket")
1296                .part_size(part_size)
1297                .enable_backpressure(true)
1298                .initial_read_window_size(initial_read_window_size)
1299                .build(),
1300        );
1301
1302        let object = MockObject::ramp(0xaa, object_size as usize, ETag::for_tests());
1303        let etag = object.etag();
1304        client.add_object("test-object", object);
1305
1306        let prefetcher_config = PrefetcherConfig {
1307            initial_request_size,
1308            ..Default::default()
1309        };
1310
1311        let prefetcher = build_prefetcher(client.clone(), PrefetcherType::Default, prefetcher_config);
1312        let object_id = ObjectId::new("test-object".to_owned(), etag);
1313        let mut request = prefetcher.prefetch("test-bucket".to_owned(), object_id, object_size);
1314
1315        // Perform sequential reads to test the download functionality
1316        let mut next_offset = 0;
1317        while next_offset < object_size {
1318            let buf = block_on(request.read(next_offset, 256 * KB)).unwrap();
1319            if buf.is_empty() {
1320                break;
1321            }
1322            let buf = buf.into_bytes().unwrap();
1323            let expected = ramp_bytes((0xaa + next_offset) as usize, buf.len());
1324            assert_eq!(&buf[..], &expected[..]);
1325            next_offset += buf.len() as u64;
1326        }
1327        assert_eq!(next_offset, object_size);
1328    }
1329
1330    #[cfg(feature = "shuttle")]
1331    mod shuttle_tests {
1332        use super::*;
1333        use futures::task::{FutureObj, Spawn, SpawnError};
1334        use shuttle::future::block_on;
1335        use shuttle::rand::Rng;
1336        use shuttle::{check_pct, check_random};
1337
1338        struct ShuttleRuntime;
1339        impl Spawn for ShuttleRuntime {
1340            fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> {
1341                shuttle::future::spawn(future);
1342                Ok(())
1343            }
1344        }
1345
1346        fn sequential_read_stress_helper() {
1347            let mut rng = shuttle::rand::thread_rng();
1348            let object_size = rng.gen_range(1u64..1 * 1024 * 1024);
1349            let max_read_window_size = rng.gen_range(16usize..1 * 1024 * 1024);
1350            let sequential_prefetch_multiplier = rng.gen_range(2usize..16);
1351            let part_size = rng.gen_range(16usize..INITIAL_REQUEST_SIZE);
1352            let initial_request_size = rng.gen_range(0..INITIAL_REQUEST_SIZE);
1353            let max_forward_seek_wait_distance = rng.gen_range(16u64..1 * 1024 * 1024 + 256 * 1024);
1354            let max_backward_seek_distance = rng.gen_range(16u64..1 * 1024 * 1024 + 256 * 1024);
1355
1356            let client = Arc::new(
1357                MockClient::config()
1358                    .bucket("test-bucket")
1359                    .part_size(part_size)
1360                    .enable_backpressure(true)
1361                    .initial_read_window_size(part_size)
1362                    .build(),
1363            );
1364            let pool = PagedPool::new_with_candidate_sizes([part_size]);
1365            let mem_limiter = Arc::new(MemoryLimiter::new(pool, MINIMUM_MEM_LIMIT));
1366            let object = MockObject::ramp(0xaa, object_size as usize, ETag::for_tests());
1367            let file_etag = object.etag();
1368
1369            client.add_object("hello", object);
1370
1371            let prefetcher_config = PrefetcherConfig {
1372                max_read_window_size,
1373                sequential_prefetch_multiplier,
1374                max_forward_seek_wait_distance,
1375                max_backward_seek_distance,
1376                initial_request_size,
1377            };
1378
1379            let prefetcher =
1380                Prefetcher::default_builder(client).build(Runtime::new(ShuttleRuntime), mem_limiter, prefetcher_config);
1381            let object_id = ObjectId::new("hello".to_owned(), file_etag);
1382            let mut request = prefetcher.prefetch("test-bucket".to_owned(), object_id, object_size);
1383
1384            let mut next_offset = 0;
1385            loop {
1386                let read_size = rng.gen_range(1usize..1 * 1024 * 1024);
1387                let buf = block_on(request.read(next_offset, read_size)).unwrap();
1388                if buf.is_empty() {
1389                    break;
1390                }
1391                let buf = buf.into_bytes().unwrap();
1392                let expected = ramp_bytes((0xaa + next_offset) as usize, buf.len());
1393                assert_eq!(&buf[..], &expected[..buf.len()]);
1394                next_offset += buf.len() as u64;
1395            }
1396            assert_eq!(next_offset, object_size);
1397        }
1398
1399        #[test]
1400        fn sequential_read_stress() {
1401            check_random(sequential_read_stress_helper, 1000);
1402            check_pct(sequential_read_stress_helper, 1000, 3);
1403        }
1404
1405        fn random_read_stress_helper() {
1406            let mut rng = shuttle::rand::thread_rng();
1407            let max_read_window_size = rng.gen_range(16usize..32 * 1024);
1408            let sequential_prefetch_multiplier = rng.gen_range(2usize..16);
1409            let part_size = rng.gen_range(16usize..128 * 1024);
1410            let initial_request_size = rng.gen_range(16usize..128 * 1024);
1411            let max_forward_seek_wait_distance = rng.gen_range(16u64..192 * 1024);
1412            let max_backward_seek_distance = rng.gen_range(16u64..192 * 1024);
1413            // Try to prevent testing very small reads of very large objects, which are easy to OOM
1414            // under Shuttle (lots of concurrent tasks)
1415            let max_object_size = initial_request_size.min(max_read_window_size) * 20;
1416            let object_size = rng.gen_range(1u64..(64 * 1024).min(max_object_size) as u64);
1417
1418            let client = Arc::new(
1419                MockClient::config()
1420                    .bucket("test-bucket")
1421                    .part_size(part_size)
1422                    .enable_backpressure(true)
1423                    .initial_read_window_size(part_size)
1424                    .build(),
1425            );
1426            let pool = PagedPool::new_with_candidate_sizes([part_size]);
1427            let mem_limiter = Arc::new(MemoryLimiter::new(pool, MINIMUM_MEM_LIMIT));
1428            let object = MockObject::ramp(0xaa, object_size as usize, ETag::for_tests());
1429            let file_etag = object.etag();
1430
1431            client.add_object("hello", object);
1432
1433            let prefetcher_config = PrefetcherConfig {
1434                max_read_window_size,
1435                sequential_prefetch_multiplier,
1436                max_forward_seek_wait_distance,
1437                max_backward_seek_distance,
1438                initial_request_size,
1439            };
1440
1441            let prefetcher =
1442                Prefetcher::default_builder(client).build(Runtime::new(ShuttleRuntime), mem_limiter, prefetcher_config);
1443            let object_id = ObjectId::new("hello".to_owned(), file_etag);
1444            let mut request = prefetcher.prefetch("test-bucket".to_owned(), object_id, object_size);
1445
1446            let num_reads = rng.gen_range(10usize..50);
1447            for _ in 0..num_reads {
1448                let offset = rng.gen_range(0u64..object_size);
1449                let length = rng.gen_range(1usize..(object_size - offset + 1) as usize);
1450                let expected = ramp_bytes((0xaa + offset) as usize, length);
1451                let buf = block_on(request.read(offset, length)).unwrap();
1452                let buf = buf.into_bytes().unwrap();
1453                assert_eq!(buf.len(), expected.len());
1454                // Don't spew the giant buffer if this test fails
1455                if buf[..] != expected[..] {
1456                    for i in 0..buf.len() {
1457                        if buf[i] != expected[i] {
1458                            panic!(
1459                                "buffer mismatch at offset {}, saw {} expected {}",
1460                                i, buf[i], expected[i]
1461                            );
1462                        }
1463                    }
1464                }
1465            }
1466        }
1467
1468        #[test]
1469        fn random_read_stress() {
1470            check_random(random_read_stress_helper, 1000);
1471            check_pct(random_read_stress_helper, 1000, 3);
1472        }
1473    }
1474}