Skip to main content

mountpoint_s3_fs/
prefetch.rs

1//! This module implements a prefetcher for GetObject requests.
2//!
3//! It works by relying on the CRT's flow-control window feature. The prefetcher creates a single
4//! GetObject request with entire length of the object (starting from the first read offset) and
5//! makes increasingly larger read window. We want the chunks to be large enough that they can make
6//! effective use of the CRT's fan-out parallelism across the S3 frontend, but small enough that we
7//! don't accumulate a lot of unread object data in memory or wastefully download data we'll never
8//! read. As the reader continues to make sequential reads, we increase the size of the read window
9//! up to some maximum. If the reader ever makes a non-sequential read, we abandon the prefetching
10//! and start again with a new GetObject request with minimum read window size.
11//!
12//! In more technical details, the prefetcher creates a RequestTask when receiving the first read
13//! request from the file system or after it has just been reset. The RequestTask consists of two main
14//! components.
15//! 1.  An ObjectPartStream that has a role to continuously fetch data from the sources which can be
16//!     either S3 or the cache on disk. The ObjectPartStream is spawned and run in a separate thread
17//!     from the prefetcher.
18//! 2.  A PartQueue, where we store data received from the ObjectPartStream, waiting to be read from
19//!     the prefetcher via a RequestTask function.
20//!
21//! A backpressure mechanism is needed to control how much data we want to store in the part queue at
22//! a time as we don't want to download the entire object into memory. For the client part stream, we
23//! may be able to rely on the CRT flow-control flow window to block when we don't increase the read
24//! window size, but for the caching part stream we don't have the machinery to do that yet. That's why
25//! we introduce the BackpressureController and BackpressureLimiter to help solving this.
26//!
27//! Essentially, the BackpressureController and BackpressureLimiter is a pair of sender/receiver of a
28//! channel, created at RequestTask initialization. The sender is handed to the RequestTask. Its role
29//! is to communicate with its receiver to tell "when" it is ready to receive more data. The receiver
30//! is handed to the ObjectPartStream where the stream should call a provided function "before" fetching
31//! more data from the sources and put them into the part queue. The BackpressureLimiter should be used
32//! as a mean to block ObjectPartStream thread to fetch more data.
33
34use std::fmt::Debug;
35
36use metrics::{counter, histogram};
37use mountpoint_s3_client::error::{GetObjectError, ObjectClientError};
38use mountpoint_s3_client::{ObjectClient, error_metadata::ProvideErrorMetadata};
39use thiserror::Error;
40use tracing::trace;
41
42use crate::checksums::{ChecksummedBytes, IntegrityError};
43use crate::data_cache::DataCache;
44use crate::fs::error_metadata::{ErrorMetadata, MOUNTPOINT_ERROR_CLIENT};
45use crate::mem_limiter::{BufferArea, MemoryLimiter};
46use crate::metrics::defs::{FUSE_CACHE_HIT, PREFETCH_RESET_STATE};
47use crate::object::ObjectId;
48use crate::sync::Arc;
49
50mod backpressure_controller;
51mod builder;
52mod caching_stream;
53mod part;
54mod part_queue;
55mod part_stream;
56mod seek_window;
57mod task;
58
59pub use builder::PrefetcherBuilder;
60use part::PartOperationError;
61use part_stream::{PartStream, RequestRange, RequestTaskConfig};
62use seek_window::SeekWindow;
63use task::RequestTask;
64
65/// Opaque identifier for a file handle, used to attribute prefetch requests to their origin.
66#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
67pub struct HandleId(u64);
68
69impl HandleId {
70    pub fn new(id: u64) -> Self {
71        Self(id)
72    }
73
74    pub fn as_raw(&self) -> u64 {
75        self.0
76    }
77}
78
79// This is a weird looking number! We really want our first request size to be 1MiB,
80// which is a common IO size. But Linux's readahead will try to read an extra 128k on on
81// top of a 1MiB read, which we'd have to wait for a second request to service. Because
82// FUSE doesn't know the difference between regular reads and readahead reads, it will
83// send us a READ request for that 128k, so we'll have to block waiting for it even if
84// the application doesn't want it. This is all in the noise for sequential IO, but
85// waiting for the readahead hurts random IO. So we add 128k to the first request size
86// to avoid the latency hit of the second request.
87pub const INITIAL_REQUEST_SIZE: usize = 1024 * 1024 + 128 * 1024;
88
89#[derive(Debug, Error)]
90pub enum PrefetchReadError<E> {
91    #[error("get object request failed")]
92    GetRequestFailed {
93        source: ObjectClientError<GetObjectError, E>,
94        metadata: Box<ErrorMetadata>,
95    },
96
97    #[error("get object request returned wrong offset")]
98    GetRequestReturnedWrongOffset { offset: u64, expected_offset: u64 },
99
100    #[error("get request terminated unexpectedly")]
101    GetRequestTerminatedUnexpectedly,
102
103    #[error("integrity check failed")]
104    Integrity(#[from] IntegrityError),
105
106    #[error("part read failed")]
107    PartReadFailed(#[from] PartOperationError),
108
109    #[error("backpressure must be enabled with non-zero initial read window")]
110    BackpressurePreconditionFailed,
111
112    #[error("read window increment failed")]
113    ReadWindowIncrement,
114}
115
116impl<E: ProvideErrorMetadata + std::error::Error + Send + Sync + 'static> PrefetchReadError<E> {
117    fn get_request_failed(err: ObjectClientError<GetObjectError, E>, bucket: &str, key: &str) -> Self {
118        let metadata = ErrorMetadata {
119            client_error_meta: err.meta(),
120            error_code: Some(MOUNTPOINT_ERROR_CLIENT.to_string()),
121            s3_bucket_name: Some(bucket.to_string()),
122            s3_object_key: Some(key.to_string()),
123        };
124        let metadata = Box::new(metadata);
125        Self::GetRequestFailed { source: err, metadata }
126    }
127}
128
129#[derive(Debug, Clone, Copy)]
130pub struct PrefetcherConfig {
131    /// Maximum size of the read window
132    pub max_read_window_size: usize,
133    /// Factor to increase the request size by whenever the reader continues making sequential reads
134    pub sequential_prefetch_multiplier: usize,
135    /// The maximum amount of unavailable data the prefetcher will tolerate during a seek operation
136    /// before resetting and starting a new S3 request.
137    pub max_forward_seek_wait_distance: u64,
138    /// The maximum distance the prefetcher will seek backwards before resetting and starting a new
139    /// S3 request. We keep this much data in memory in addition to any inflight requests.
140    pub max_backward_seek_distance: u64,
141    /// Size of the initial request. This request, of size possibly smaller than part_size,
142    /// is made to lower the latency in small-random-reads usage pattern. If set to 0, initial request
143    /// is skipped.
144    pub initial_request_size: usize,
145}
146
147impl Default for PrefetcherConfig {
148    #[allow(clippy::identity_op)]
149    fn default() -> Self {
150        Self {
151            max_read_window_size: determine_max_read_size(),
152            sequential_prefetch_multiplier: 2,
153            // We want these large enough to tolerate a single out-of-order Linux readahead, which
154            // is at most 256KiB backwards and then 512KiB forwards. For forwards seeks, we're also
155            // making a guess about where the optimal cut-off point is before it would be faster to
156            // just start a new request instead.
157            max_forward_seek_wait_distance: 16 * 1024 * 1024,
158            max_backward_seek_distance: 1 * 1024 * 1024,
159            initial_request_size: INITIAL_REQUEST_SIZE,
160        }
161    }
162}
163
164/// Provide the maximum read size for the prefetcher, for which there is one prefetcher per file handle.
165///
166/// This allows a way to override the prefetch window rather than using the hardcoded default within Mountpoint.
167/// We do not recommend using the override, and it may be removed at any time.
168///
169/// This parameter may not be accurately adopted when using small values.
170/// When prefetching starts, it will fetch 1MiB + 128KiB at time of writing.
171/// This parameter will only be used when scaling up the prefetch window.
172///
173/// This unstable override is expected to be removed once adaptive prefetching based on available memory is available:
174/// https://github.com/awslabs/mountpoint-s3/issues/987
175fn determine_max_read_size() -> usize {
176    const ENV_VAR_KEY: &str = "UNSTABLE_MOUNTPOINT_MAX_PREFETCH_WINDOW_SIZE";
177    const DEFAULT_READ_WINDOW_SIZE: usize = 2 * 1024 * 1024 * 1024;
178
179    match std::env::var_os(ENV_VAR_KEY) {
180        Some(val) => match val.to_string_lossy().parse() {
181            Ok(val) => {
182                tracing::warn!(
183                    "successfully overridden prefetch read window size \
184                        with new value {val} bytes from unstable environment config",
185                );
186                val
187            }
188            Err(_) => {
189                tracing::warn!(
190                    "{ENV_VAR_KEY} did not contain a valid positive integer \
191                        for prefetch bytes, using {DEFAULT_READ_WINDOW_SIZE} bytes instead",
192                );
193                DEFAULT_READ_WINDOW_SIZE
194            }
195        },
196        None => DEFAULT_READ_WINDOW_SIZE,
197    }
198}
199
200/// A [Prefetcher] creates and manages prefetching GetObject requests to objects.
201#[derive(Debug)]
202pub struct Prefetcher<Client> {
203    part_stream: PartStream<Client>,
204    config: PrefetcherConfig,
205    mem_limiter: Arc<MemoryLimiter>,
206}
207
208impl<Client> Prefetcher<Client>
209where
210    Client: ObjectClient + Clone + Send + Sync + 'static,
211{
212    /// Creates an instance of the default [Prefetcher] builder.
213    pub fn default_builder(client: Client) -> PrefetcherBuilder<Client> {
214        PrefetcherBuilder::default_builder(client)
215    }
216
217    /// Creates an instance of a caching [Prefetcher] builder.
218    pub fn caching_builder<Cache>(cache: Cache, client: Client) -> PrefetcherBuilder<Client>
219    where
220        Cache: DataCache + Send + Sync + 'static,
221    {
222        PrefetcherBuilder::caching_builder(cache, client)
223    }
224
225    /// Create a new [Prefetcher] from the given [ObjectPartStream] instance.
226    pub fn new(part_stream: PartStream<Client>, config: PrefetcherConfig, mem_limiter: Arc<MemoryLimiter>) -> Self {
227        Self {
228            part_stream,
229            config,
230            mem_limiter,
231        }
232    }
233
234    /// Start a new prefetch request to the specified object.
235    pub fn prefetch(
236        &self,
237        bucket: String,
238        object_id: ObjectId,
239        handle_id: HandleId,
240        size: u64,
241    ) -> PrefetchGetObject<Client>
242    where
243        Client: ObjectClient + Clone + Send + Sync + 'static,
244    {
245        PrefetchGetObject::new(
246            self.part_stream.clone(),
247            self.config,
248            bucket,
249            object_id,
250            handle_id,
251            size,
252            self.mem_limiter.clone(),
253        )
254    }
255}
256
257/// Result of a prefetch request. Allows callers to read object data.
258#[derive(Debug)]
259pub struct PrefetchGetObject<Client>
260where
261    Client: ObjectClient + Clone + Send + Sync + 'static,
262{
263    part_stream: PartStream<Client>,
264    config: PrefetcherConfig,
265    backpressure_task: Option<RequestTask<Client>>,
266    // Invariant: the offset of the last byte in this window is always
267    // self.next_sequential_read_offset - 1.
268    backward_seek_window: SeekWindow,
269    bucket: String,
270    object_id: ObjectId,
271    // preferred part size in the prefetcher's part queue, not the object part
272    preferred_part_size: usize,
273    /// Start offset for sequential read, used for calculating contiguous read metric
274    sequential_read_start_offset: u64,
275    next_sequential_read_offset: u64,
276    next_request_offset: u64,
277    size: u64,
278    mem_limiter: Arc<MemoryLimiter>,
279    /// File handle ID that owns this prefetch request, for per-handle memory accounting.
280    handle_id: HandleId,
281}
282
283impl<Client> PrefetchGetObject<Client>
284where
285    Client: ObjectClient + Clone + Send + Sync + 'static,
286{
287    /// Create and spawn a new prefetching request for an object
288    fn new(
289        part_stream: PartStream<Client>,
290        config: PrefetcherConfig,
291        bucket: String,
292        object_id: ObjectId,
293        handle_id: HandleId,
294        size: u64,
295        mem_limiter: Arc<MemoryLimiter>,
296    ) -> Self {
297        let max_backward_seek_distance = config.max_backward_seek_distance as usize;
298        // a conservative memory reservation to avoid violating the memory limit with the large number of file handles
299        // note that this reservation is done in addition to the one in [PartQueue::push_front]
300        let seek_window_reservation =
301            Self::seek_window_reservation(part_stream.client().read_part_size(), max_backward_seek_distance);
302        mem_limiter.reserve(BufferArea::Prefetch, seek_window_reservation);
303        PrefetchGetObject {
304            part_stream,
305            config,
306            backpressure_task: None,
307            backward_seek_window: SeekWindow::new(max_backward_seek_distance),
308            preferred_part_size: 128 * 1024,
309            sequential_read_start_offset: 0,
310            next_sequential_read_offset: 0,
311            next_request_offset: 0,
312            bucket,
313            object_id,
314            size,
315            mem_limiter,
316            handle_id,
317        }
318    }
319
320    /// Read some bytes from the object. This function will always return exactly `size` bytes,
321    /// except at the end of the object where it will return however many bytes are left (including
322    /// possibly 0 bytes).
323    pub async fn read(
324        &mut self,
325        offset: u64,
326        length: usize,
327    ) -> Result<ChecksummedBytes, PrefetchReadError<Client::ClientError>> {
328        trace!(
329            offset,
330            length,
331            next_seq_offset = self.next_sequential_read_offset,
332            "read"
333        );
334
335        match self.try_read(offset, length).await {
336            Ok((data, cache_hit)) => {
337                // Record cache hit metric for FUSE layer. We only record a cache hit when ALL parts
338                // for this read request were served from cache storage (disk/express), not from S3.
339                // Partial cache hits (some parts from cache, some from S3) are counted as cache misses
340                // to provide a clear binary metric. Parts served from in-memory prefetch buffers
341                // (from previous S3 requests) don't count as cache hits.
342
343                if !data.is_empty() && cache_hit {
344                    // We only increment the counter on cache_hit=true because OTLP counters don't preserve
345                    // data point counts needed for meaningful averages.
346                    // FIXME: Consider using histogram to capture partial hit ratios.
347                    metrics::counter!(FUSE_CACHE_HIT).increment(1);
348                }
349                Ok(data)
350            }
351            Err(err) => {
352                self.reset_prefetch_to_offset(offset);
353                Err(err)
354            }
355        }
356    }
357
358    async fn try_read(
359        &mut self,
360        offset: u64,
361        length: usize,
362    ) -> Result<(ChecksummedBytes, bool), PrefetchReadError<Client::ClientError>> {
363        // Currently, we set preferred part size to the current read size.
364        // Our assumption is that the read size will be the same for most sequential
365        // read and it can be aligned to the size of prefetched chunks.
366        //
367        // We initialize this value to 128k as it is the Linux's readahead size
368        // and it can also be used as a lower bound in case the read size is too small.
369        // The upper bound is 1MiB since it should be a common IO size.
370        let max_preferred_part_size = 1024 * 1024;
371        self.preferred_part_size = self.preferred_part_size.max(length).min(max_preferred_part_size);
372
373        let remaining = self.size.saturating_sub(offset);
374        if remaining == 0 {
375            return Ok((ChecksummedBytes::default(), false));
376        }
377        let mut to_read = (length as u64).min(remaining);
378
379        // Try to seek if this read is not sequential, and if seeking fails, cancel and reset the
380        // prefetcher.
381        if self.next_sequential_read_offset != offset {
382            if self.try_seek(offset).await? {
383                trace!("seek succeeded");
384            } else {
385                trace!(
386                    expected = self.next_sequential_read_offset,
387                    actual = offset,
388                    "out-of-order read, resetting prefetch"
389                );
390                counter!(PREFETCH_RESET_STATE).increment(1);
391
392                // This is an approximation, tolerating some seeking caused by concurrent readahead.
393                self.record_contiguous_read_metric();
394
395                self.reset_prefetch_to_offset(offset);
396            }
397        }
398        assert_eq!(self.next_sequential_read_offset, offset);
399
400        if self.backpressure_task.is_none() {
401            self.backpressure_task = Some(self.spawn_read_backpressure_request()?);
402        }
403
404        let mut all_parts_from_cache = true;
405        let mut response = ChecksummedBytes::default();
406        while to_read > 0 {
407            let Some(current_task) = self.backpressure_task.as_mut() else {
408                trace!(offset, length, "read beyond object size");
409                break;
410            };
411            debug_assert!(current_task.remaining() > 0);
412
413            let part = current_task.read(to_read as usize).await?;
414            all_parts_from_cache &= part.is_from_cache();
415            self.backward_seek_window.push(part.clone());
416            let part_bytes = part.into_bytes(&self.object_id, self.next_sequential_read_offset)?;
417
418            self.next_sequential_read_offset += part_bytes.len() as u64;
419            // If we can complete the read with just a single buffer, early return to avoid copying
420            // into a new buffer. This should be the common case as long as part size is larger than
421            // read size, which it almost always is for real S3 clients and FUSE.
422            if response.is_empty() && part_bytes.len() == to_read as usize {
423                return Ok((part_bytes, all_parts_from_cache));
424            }
425
426            let part_len = part_bytes.len() as u64;
427            response.extend(part_bytes)?;
428            to_read -= part_len;
429        }
430
431        Ok((response, all_parts_from_cache))
432    }
433
434    /// Spawn a backpressure GetObject request which has a range from current offset to the end of the file.
435    /// We will be using flow-control window to control how much data we want to download into the prefetcher.
436    fn spawn_read_backpressure_request(
437        &mut self,
438    ) -> Result<RequestTask<Client>, PrefetchReadError<Client::ClientError>> {
439        let start = self.next_sequential_read_offset;
440        let object_size = self.size as usize;
441        let read_part_size = self.part_stream.client().read_part_size();
442        let range = RequestRange::new(object_size, start, object_size);
443
444        // The prefetcher now relies on backpressure mechanism so it must be enabled
445        match self.part_stream.client().initial_read_window_size() {
446            Some(value) => {
447                // Also, make sure that we don't get blocked from the beginning
448                if value == 0 {
449                    return Err(PrefetchReadError::BackpressurePreconditionFailed);
450                }
451            }
452            None => return Err(PrefetchReadError::BackpressurePreconditionFailed),
453        };
454
455        let config = RequestTaskConfig {
456            bucket: self.bucket.clone(),
457            object_id: self.object_id.clone(),
458            handle_id: self.handle_id,
459            range,
460            read_part_size,
461            preferred_part_size: self.preferred_part_size,
462            initial_request_size: self.config.initial_request_size,
463            max_read_window_size: self.config.max_read_window_size,
464            read_window_size_multiplier: self.config.sequential_prefetch_multiplier,
465        };
466        Ok(self.part_stream.spawn_get_object_request(config))
467    }
468
469    /// Reset this prefetch request to a new offset, clearing any existing tasks queued.
470    fn reset_prefetch_to_offset(&mut self, offset: u64) {
471        self.backpressure_task = None;
472        self.backward_seek_window.clear();
473        self.sequential_read_start_offset = offset;
474        self.next_sequential_read_offset = offset;
475        self.next_request_offset = offset;
476    }
477
478    /// Try to seek within the current inflight requests without restarting them. Returns true if
479    /// the seek succeeded, in which case self.next_sequential_read_offset will be updated to the
480    /// new offset. If this returns false, the prefetcher is in an unknown state and must be reset.
481    async fn try_seek(&mut self, offset: u64) -> Result<bool, PrefetchReadError<Client::ClientError>> {
482        assert_ne!(offset, self.next_sequential_read_offset);
483        trace!(from = self.next_sequential_read_offset, to = offset, "trying to seek");
484        if offset > self.next_sequential_read_offset {
485            self.try_seek_forward(offset).await
486        } else {
487            self.try_seek_backward(offset).await
488        }
489    }
490
491    async fn try_seek_forward(&mut self, offset: u64) -> Result<bool, PrefetchReadError<Client::ClientError>> {
492        assert!(offset > self.next_sequential_read_offset);
493        let total_seek_distance = offset - self.next_sequential_read_offset;
494        histogram!("prefetch.seek_distance", "dir" => "forward").record(total_seek_distance as f64);
495
496        let Some(task) = self.backpressure_task.as_mut() else {
497            // Can't seek if there's no requests in flight at all
498            return Ok(false);
499        };
500
501        // Not enough data in the read window to serve the forward seek
502        if offset >= task.read_window_end_offset() {
503            return Ok(false);
504        }
505
506        // If we have enough bytes already downloaded (`available`) to skip straight to this read, then do
507        // it. Otherwise, we're willing to wait for the bytes to download only if they're coming "soon", where
508        // soon is defined as up to `max_forward_seek_wait_distance` bytes ahead of the available offset.
509        let available_offset = task.available_offset();
510        let available_soon_offset = available_offset.saturating_add(self.config.max_forward_seek_wait_distance);
511        if offset >= available_soon_offset {
512            trace!(
513                requested_offset = offset,
514                available_offset = available_offset,
515                "seek failed: not enough data available"
516            );
517            return Ok(false);
518        }
519        let mut seek_distance = offset - self.next_sequential_read_offset;
520        while seek_distance > 0 {
521            let part = task.read(seek_distance as usize).await?;
522            seek_distance -= part.len() as u64;
523            self.next_sequential_read_offset += part.len() as u64;
524            self.backward_seek_window.push(part);
525        }
526        Ok(true)
527    }
528
529    async fn try_seek_backward(&mut self, offset: u64) -> Result<bool, PrefetchReadError<Client::ClientError>> {
530        assert!(offset < self.next_sequential_read_offset);
531
532        // When the task is None it means either we have just started prefetching or recently reset it,
533        // in both cases the backward seek window would be empty so we can bail out early.
534        let Some(task) = self.backpressure_task.as_mut() else {
535            return Ok(false);
536        };
537        let backwards_length_needed = self.next_sequential_read_offset - offset;
538        histogram!("prefetch.seek_distance", "dir" => "backward").record(backwards_length_needed as f64);
539
540        let Some(parts) = self.backward_seek_window.read_back(backwards_length_needed as usize) else {
541            trace!("seek failed: not enough data in backwards seek window");
542            return Ok(false);
543        };
544        // This also increase `prefetcher_mem_reserved` value in memory limiter.
545        // At least one subsequent `RequestTask::read` is required for memory tracking to work correctly
546        // because `BackpressureController::drop` needs to know the start offset of the part queue to
547        // release the right amount of memory.
548        task.push_front(parts).await?;
549        self.next_sequential_read_offset = offset;
550        Ok(true)
551    }
552
553    /// Record the end of a contiguous read.
554    ///
555    /// This should be invoked at the end of each set of contiguous reads, including if no further read occurs.
556    fn record_contiguous_read_metric(&self) {
557        histogram!("prefetch.contiguous_read_len")
558            .record((self.next_sequential_read_offset - self.sequential_read_start_offset) as f64);
559    }
560
561    /// The amount of memory reserved for a backwards seek window.
562    ///
563    /// The seek window size is rounded up to the nearest multiple of part_size.
564    fn seek_window_reservation(part_size: usize, seek_window_size: usize) -> u64 {
565        (seek_window_size.div_ceil(part_size) * part_size) as u64
566    }
567}
568
569impl<Client> Drop for PrefetchGetObject<Client>
570where
571    Client: ObjectClient + Clone + Send + Sync + 'static,
572{
573    fn drop(&mut self) {
574        let seek_window_reservation = Self::seek_window_reservation(
575            self.part_stream.client().read_part_size(),
576            self.backward_seek_window.max_size(),
577        );
578        self.mem_limiter.release(BufferArea::Prefetch, seek_window_reservation);
579        self.record_contiguous_read_metric();
580    }
581}
582
583#[cfg(test)]
584mod tests {
585    // It's convenient to write test constants like "1 * 1024 * 1024" for symmetry
586    #![allow(clippy::identity_op)]
587
588    use crate::Runtime;
589    use crate::data_cache::InMemoryDataCache;
590    use crate::mem_limiter::{MINIMUM_MEM_LIMIT, MemoryLimiter};
591    use crate::memory::PagedPool;
592    use crate::sync::Arc;
593
594    use super::*;
595    use futures::executor::{ThreadPool, block_on};
596    use mountpoint_s3_client::failure_client::{
597        CountdownFailureConfig, GetObjectFailureMode, countdown_failure_client,
598    };
599    use mountpoint_s3_client::mock_client::{MockClient, MockClientConfig, MockClientError, MockObject, ramp_bytes};
600    use mountpoint_s3_client::types::ETag;
601    use proptest::proptest;
602    use proptest::strategy::{Just, Strategy};
603    use proptest_derive::Arbitrary;
604    use std::collections::HashMap;
605    use test_case::test_case;
606
607    const KB: usize = 1024;
608    const MB: usize = 1024 * 1024;
609
610    #[derive(Debug, Arbitrary)]
611    struct TestConfig {
612        #[proptest(strategy = "16usize..1*1024*1024")]
613        initial_request_size: usize,
614        #[proptest(strategy = "16usize..1*1024*1024")]
615        max_read_window_size: usize,
616        #[proptest(strategy = "1usize..8usize")]
617        sequential_prefetch_multiplier: usize,
618        #[proptest(strategy = "16usize..2*1024*1024")]
619        client_part_size: usize,
620        #[proptest(strategy = "1u64..4*1024*1024")]
621        max_forward_seek_wait_distance: u64,
622        #[proptest(strategy = "1u64..4*1024*1024")]
623        max_backward_seek_distance: u64,
624        #[proptest(strategy = "16usize..1*1024*1024")]
625        cache_block_size: usize,
626    }
627
628    enum PrefetcherType {
629        Default,
630        InMemoryCache(usize),
631    }
632
633    fn build_prefetcher<Client>(
634        client: Client,
635        prefetcher_type: PrefetcherType,
636        prefetcher_config: PrefetcherConfig,
637    ) -> Prefetcher<Client>
638    where
639        Client: ObjectClient + Clone + Send + Sync + 'static,
640    {
641        let pool = PagedPool::new_with_candidate_sizes([client.read_part_size(), client.write_part_size()]);
642        let mem_limiter = Arc::new(MemoryLimiter::new(pool, MINIMUM_MEM_LIMIT));
643        let runtime = Runtime::new(ThreadPool::builder().pool_size(1).create().unwrap());
644        let builder = match prefetcher_type {
645            PrefetcherType::Default => Prefetcher::default_builder(client),
646            PrefetcherType::InMemoryCache(block_size) => {
647                let cache = InMemoryDataCache::new(block_size as u64);
648                Prefetcher::caching_builder(cache, client)
649            }
650        };
651        builder.build(runtime, mem_limiter, prefetcher_config)
652    }
653
654    fn run_sequential_read_test(prefetcher_type: PrefetcherType, size: u64, read_size: usize, test_config: TestConfig) {
655        let client = Arc::new(
656            MockClient::config()
657                .bucket("test-bucket")
658                .part_size(test_config.client_part_size)
659                .enable_backpressure(true)
660                .initial_read_window_size(test_config.client_part_size)
661                .build(),
662        );
663        let object = MockObject::ramp(0xaa, size as usize, ETag::for_tests());
664        let etag = object.etag();
665
666        client.add_object("hello", object);
667
668        let prefetcher_config = PrefetcherConfig {
669            max_read_window_size: test_config.max_read_window_size,
670            sequential_prefetch_multiplier: test_config.sequential_prefetch_multiplier,
671            max_forward_seek_wait_distance: test_config.max_forward_seek_wait_distance,
672            max_backward_seek_distance: test_config.max_backward_seek_distance,
673            initial_request_size: test_config.initial_request_size,
674        };
675
676        let prefetcher = build_prefetcher(client.clone(), prefetcher_type, prefetcher_config);
677        let object_id = ObjectId::new("hello".to_owned(), etag);
678        let fh = HandleId::new(1);
679        let mut request = prefetcher.prefetch("test-bucket".to_owned(), object_id, fh, size);
680
681        let mut next_offset = 0;
682        loop {
683            let buf = block_on(request.read(next_offset, read_size)).unwrap();
684            if buf.is_empty() {
685                break;
686            }
687            let buf = buf.into_bytes().unwrap();
688            let expected = ramp_bytes((0xaa + next_offset) as usize, buf.len());
689            assert_eq!(&buf[..], &expected[..buf.len()]);
690            next_offset += buf.len() as u64;
691        }
692        assert_eq!(next_offset, size);
693    }
694
695    #[test_case(PrefetcherType::Default)]
696    #[test_case(PrefetcherType::InMemoryCache(1 * MB))]
697    fn sequential_read_small(prefetcher_type: PrefetcherType) {
698        let config = TestConfig {
699            initial_request_size: 256 * 1024,
700            max_read_window_size: 1024 * 1024 * 1024,
701            sequential_prefetch_multiplier: 8,
702            client_part_size: 8 * 1024 * 1024,
703            max_forward_seek_wait_distance: 16 * 1024 * 1024,
704            max_backward_seek_distance: 2 * 1024 * 1024,
705            cache_block_size: 1 * MB,
706        };
707        run_sequential_read_test(prefetcher_type, 1024 * 1024 + 111, 1024 * 1024, config);
708    }
709
710    #[test_case(PrefetcherType::Default)]
711    #[test_case(PrefetcherType::InMemoryCache(1 * MB))]
712    fn sequential_read_medium(prefetcher_type: PrefetcherType) {
713        let config = TestConfig {
714            initial_request_size: 256 * 1024,
715            max_read_window_size: 64 * 1024 * 1024,
716            sequential_prefetch_multiplier: 8,
717            client_part_size: 8 * 1024 * 1024,
718            max_forward_seek_wait_distance: 16 * 1024 * 1024,
719            max_backward_seek_distance: 2 * 1024 * 1024,
720            cache_block_size: 1 * MB,
721        };
722        run_sequential_read_test(prefetcher_type, 16 * 1024 * 1024 + 111, 1024 * 1024, config);
723    }
724
725    #[test_case(PrefetcherType::Default)]
726    #[test_case(PrefetcherType::InMemoryCache(1 * MB))]
727    fn sequential_read_large(prefetcher_type: PrefetcherType) {
728        let config = TestConfig {
729            initial_request_size: 256 * 1024,
730            max_read_window_size: 64 * 1024 * 1024,
731            sequential_prefetch_multiplier: 8,
732            client_part_size: 8 * 1024 * 1024,
733            max_forward_seek_wait_distance: 16 * 1024 * 1024,
734            max_backward_seek_distance: 2 * 1024 * 1024,
735            cache_block_size: 1 * MB,
736        };
737
738        run_sequential_read_test(prefetcher_type, 256 * 1024 * 1024 + 111, 1024 * 1024, config);
739    }
740
741    fn fail_with_backpressure_precondition_test(
742        prefetcher_type: PrefetcherType,
743        test_config: TestConfig,
744        client_config: MockClientConfig,
745    ) {
746        let client = Arc::new(MockClient::new(client_config));
747        let read_size = 1 * MB;
748        let object_size = 8 * MB;
749        let object = MockObject::ramp(0xaa, object_size, ETag::for_tests());
750        let etag = object.etag();
751
752        let prefetcher_config = PrefetcherConfig {
753            max_read_window_size: test_config.max_read_window_size,
754            sequential_prefetch_multiplier: test_config.sequential_prefetch_multiplier,
755            ..Default::default()
756        };
757
758        let prefetcher = build_prefetcher(client, prefetcher_type, prefetcher_config);
759        let object_id = ObjectId::new("hello".to_owned(), etag);
760        let fh = HandleId::new(1);
761        let mut request = prefetcher.prefetch("test-bucket".to_owned(), object_id, fh, object_size as u64);
762        let result = block_on(request.read(0, read_size));
763        assert!(matches!(result, Err(PrefetchReadError::BackpressurePreconditionFailed)));
764    }
765
766    #[test_case(PrefetcherType::Default)]
767    #[test_case(PrefetcherType::InMemoryCache(1 * MB))]
768    fn fail_with_backpressure_not_enabled(prefetcher_type: PrefetcherType) {
769        let test_config = TestConfig {
770            initial_request_size: 256 * 1024,
771            max_read_window_size: 1024 * 1024 * 1024,
772            sequential_prefetch_multiplier: 8,
773            client_part_size: 8 * 1024 * 1024,
774            max_forward_seek_wait_distance: 16 * 1024 * 1024,
775            max_backward_seek_distance: 2 * 1024 * 1024,
776            cache_block_size: 1 * MB,
777        };
778
779        // backpressure is not enabled for the client
780        let config = MockClient::config()
781            .bucket("test-bucket")
782            .part_size(test_config.client_part_size)
783            .enable_backpressure(false);
784
785        fail_with_backpressure_precondition_test(prefetcher_type, test_config, config);
786    }
787
788    #[test_case(PrefetcherType::Default)]
789    #[test_case(PrefetcherType::InMemoryCache(1 * MB))]
790    fn fail_with_backpressure_zero_read_window(prefetcher_type: PrefetcherType) {
791        let test_config = TestConfig {
792            initial_request_size: 256 * 1024,
793            max_read_window_size: 1024 * 1024 * 1024,
794            sequential_prefetch_multiplier: 8,
795            client_part_size: 8 * 1024 * 1024,
796            max_forward_seek_wait_distance: 16 * 1024 * 1024,
797            max_backward_seek_distance: 2 * 1024 * 1024,
798            cache_block_size: 1 * MB,
799        };
800
801        // backpressure is enabled but initial read window size is zero
802        let config = MockClient::config()
803            .bucket("test-bucket")
804            .part_size(test_config.client_part_size)
805            .enable_backpressure(true)
806            .initial_read_window_size(0);
807
808        fail_with_backpressure_precondition_test(prefetcher_type, test_config, config);
809    }
810
811    fn fail_sequential_read_test(
812        prefetcher_type: PrefetcherType,
813        size: u64,
814        read_size: usize,
815        test_config: TestConfig,
816        get_failures: HashMap<usize, GetObjectFailureMode<MockClientError>>,
817    ) {
818        let client = MockClient::config()
819            .bucket("test-bucket")
820            .part_size(test_config.client_part_size)
821            .enable_backpressure(true)
822            .initial_read_window_size(test_config.client_part_size)
823            .build();
824        let object = MockObject::ramp(0xaa, size as usize, ETag::for_tests());
825        let etag = object.etag();
826
827        client.add_object("hello", object);
828
829        let client = Arc::new(countdown_failure_client(
830            client,
831            CountdownFailureConfig {
832                get_failures,
833                ..Default::default()
834            },
835        ));
836
837        let prefetcher_config = PrefetcherConfig {
838            max_read_window_size: test_config.max_read_window_size,
839            sequential_prefetch_multiplier: test_config.sequential_prefetch_multiplier,
840            initial_request_size: test_config.initial_request_size,
841            ..Default::default()
842        };
843
844        let prefetcher = build_prefetcher(client, prefetcher_type, prefetcher_config);
845        let object_id = ObjectId::new("hello".to_owned(), etag);
846        let fh = HandleId::new(1);
847        let mut request = prefetcher.prefetch("test-bucket".to_owned(), object_id, fh, size);
848
849        let mut next_offset = 0;
850        loop {
851            let buf = match block_on(request.read(next_offset, read_size)) {
852                Ok(buf) => buf,
853                Err(_) => break,
854            };
855            let buf = buf.into_bytes().unwrap();
856
857            if buf.is_empty() {
858                break;
859            }
860            let expected = ramp_bytes((0xaa + next_offset) as usize, buf.len());
861            assert_eq!(&buf[..], &expected[..buf.len()]);
862            next_offset += buf.len() as u64;
863        }
864        assert!(next_offset < size); // Since we're injecting failures, shouldn't make it to the end
865    }
866
867    #[test_case("invalid range; length=42", PrefetcherType::Default)]
868    #[test_case("invalid range; length=42", PrefetcherType::InMemoryCache(1 * MB))]
869    // test case for the request failure due to etag not matching
870    #[test_case(
871        "At least one of the pre-conditions you specified did not hold",
872        PrefetcherType::Default
873    )]
874    #[test_case("At least one of the pre-conditions you specified did not hold", PrefetcherType::InMemoryCache(1 * MB))]
875    fn fail_request_sequential_small(err_value: &str, prefetcher_type: PrefetcherType) {
876        let config = TestConfig {
877            initial_request_size: 256 * 1024,
878            max_read_window_size: 1024 * 1024 * 1024,
879            sequential_prefetch_multiplier: 8,
880            client_part_size: 8 * 1024 * 1024,
881            max_forward_seek_wait_distance: 16 * 1024 * 1024,
882            max_backward_seek_distance: 2 * 1024 * 1024,
883            cache_block_size: 1 * MB,
884        };
885
886        let mut get_failures = HashMap::new();
887        get_failures.insert(
888            2,
889            GetObjectFailureMode::OperationError(ObjectClientError::ClientError(MockClientError(
890                err_value.to_owned().into(),
891            ))),
892        );
893
894        fail_sequential_read_test(prefetcher_type, 1024 * 1024 + 111, 1024 * 1024, config, get_failures);
895    }
896
897    proptest! {
898        #[test]
899        fn proptest_sequential_read(
900            size in 1u64..1 * 1024 * 1024,
901            read_size in 1usize..1 * 1024 * 1024,
902            config: TestConfig,
903        ) {
904            run_sequential_read_test(PrefetcherType::Default, size, read_size, config);
905        }
906
907        #[test]
908        fn proptest_sequential_read_small_read_size(size in 1u64..1 * 1024 * 1024, read_factor in 1usize..10, config: TestConfig) {
909            // Pick read size smaller than the object size
910            let read_size = (size as usize / read_factor).max(1);
911            run_sequential_read_test(PrefetcherType::Default, size, read_size, config);
912        }
913
914        #[test]
915        fn proptest_sequential_read_with_cache(
916            size in 1u64..1 * 1024 * 1024,
917            read_size in 1usize..1 * 1024 * 1024,
918            config: TestConfig,
919        ) {
920            run_sequential_read_test(PrefetcherType::InMemoryCache(config.cache_block_size), size, read_size, config);
921        }
922
923        #[test]
924        fn proptest_sequential_read_small_read_size_with_cache(size in 1u64..1 * 1024 * 1024, read_factor in 1usize..10,
925            config: TestConfig) {
926            // Pick read size smaller than the object size
927            let read_size = (size as usize / read_factor).max(1);
928            run_sequential_read_test(PrefetcherType::InMemoryCache(config.cache_block_size), size, read_size, config);
929        }
930    }
931
932    #[test]
933    fn test_sequential_read_regression() {
934        let object_size = 854966;
935        let read_size = 161647;
936        let config = TestConfig {
937            initial_request_size: 484941,
938            max_read_window_size: 81509,
939            sequential_prefetch_multiplier: 1,
940            client_part_size: 181682,
941            max_forward_seek_wait_distance: 1,
942            max_backward_seek_distance: 18668,
943            cache_block_size: 1 * MB,
944        };
945        run_sequential_read_test(PrefetcherType::Default, object_size, read_size, config);
946    }
947
948    fn run_random_read_test(
949        prefetcher_type: PrefetcherType,
950        object_size: u64,
951        reads: Vec<(u64, usize)>,
952        test_config: TestConfig,
953    ) {
954        let client = Arc::new(
955            MockClient::config()
956                .bucket("test-bucket")
957                .part_size(test_config.client_part_size)
958                .enable_backpressure(true)
959                .initial_read_window_size(test_config.client_part_size)
960                .build(),
961        );
962        let object = MockObject::ramp(0xaa, object_size as usize, ETag::for_tests());
963        let etag = object.etag();
964
965        client.add_object("hello", object);
966
967        let prefetcher_config = PrefetcherConfig {
968            max_read_window_size: test_config.max_read_window_size,
969            sequential_prefetch_multiplier: test_config.sequential_prefetch_multiplier,
970            max_forward_seek_wait_distance: test_config.max_forward_seek_wait_distance,
971            max_backward_seek_distance: test_config.max_backward_seek_distance,
972            initial_request_size: test_config.initial_request_size,
973        };
974
975        let prefetcher = build_prefetcher(client, prefetcher_type, prefetcher_config);
976        let object_id = ObjectId::new("hello".to_owned(), etag);
977        let fh = HandleId::new(1);
978        let mut request = prefetcher.prefetch("test-bucket".to_owned(), object_id, fh, object_size);
979
980        for (offset, length) in reads {
981            assert!(offset < object_size);
982            assert!(offset + length as u64 <= object_size);
983            let expected = ramp_bytes((0xaa + offset) as usize, length);
984            let buf = block_on(request.read(offset, length)).unwrap();
985            let buf = buf.into_bytes().unwrap();
986            assert_eq!(buf.len(), expected.len());
987            // Don't spew the giant buffer if this test fails
988            if buf[..] != expected[..] {
989                for i in 0..buf.len() {
990                    if buf[i] != expected[i] {
991                        panic!(
992                            "buffer mismatch at offset {}, saw {} expected {}",
993                            i, buf[i], expected[i]
994                        );
995                    }
996                }
997            }
998        }
999    }
1000
1001    fn random_read_strategy(max_object_size: u64) -> impl Strategy<Value = (u64, Vec<(u64, usize)>)> {
1002        (1..=max_object_size).prop_flat_map(|object_size| {
1003            (
1004                Just(object_size),
1005                proptest::collection::vec(
1006                    (0..object_size).prop_flat_map(move |offset| {
1007                        (1..=object_size - offset).prop_map(move |length| (offset, length as usize))
1008                    }),
1009                    0..10,
1010                ),
1011            )
1012        })
1013    }
1014
1015    proptest! {
1016        #[test]
1017        fn proptest_random_read(
1018            reads in random_read_strategy(1 * 1024 * 1024),
1019            config: TestConfig,
1020        ) {
1021            let (object_size, reads) = reads;
1022            run_random_read_test(PrefetcherType::Default, object_size, reads, config);
1023        }
1024
1025        #[test]
1026        fn proptest_random_read_with_cache(
1027            reads in random_read_strategy(1 * 1024 * 1024),
1028            config: TestConfig,
1029        ) {
1030            let (object_size, reads) = reads;
1031            run_random_read_test(PrefetcherType::InMemoryCache(config.cache_block_size), object_size, reads, config);
1032        }
1033    }
1034
1035    #[test]
1036    fn test_random_read_regression() {
1037        let object_size = 724314;
1038        let reads = vec![(0, 516883)];
1039        let config = TestConfig {
1040            initial_request_size: 3684779,
1041            max_read_window_size: 2147621,
1042            sequential_prefetch_multiplier: 4,
1043            client_part_size: 516882,
1044            max_forward_seek_wait_distance: 16 * 1024 * 1024,
1045            max_backward_seek_distance: 2 * 1024 * 1024,
1046            cache_block_size: 1 * MB,
1047        };
1048        run_random_read_test(PrefetcherType::Default, object_size, reads, config);
1049    }
1050
1051    #[test]
1052    fn test_random_read_regression2() {
1053        let object_size = 755678;
1054        let reads = vec![(0, 278499), (311250, 1)];
1055        let config = TestConfig {
1056            initial_request_size: 556997,
1057            max_read_window_size: 105938,
1058            sequential_prefetch_multiplier: 7,
1059            client_part_size: 1219731,
1060            max_forward_seek_wait_distance: 16 * 1024 * 1024,
1061            max_backward_seek_distance: 2 * 1024 * 1024,
1062            cache_block_size: 1 * MB,
1063        };
1064        run_random_read_test(PrefetcherType::Default, object_size, reads, config);
1065    }
1066
1067    #[test]
1068    fn test_random_read_regression3() {
1069        let object_size = 755678;
1070        let reads = vec![(0, 236766), (291204, 1), (280930, 36002)];
1071        let config = TestConfig {
1072            initial_request_size: 556997,
1073            max_read_window_size: 105938,
1074            sequential_prefetch_multiplier: 7,
1075            client_part_size: 1219731,
1076            max_forward_seek_wait_distance: 2260662,
1077            max_backward_seek_distance: 2369799,
1078            cache_block_size: 1 * MB,
1079        };
1080        run_random_read_test(PrefetcherType::Default, object_size, reads, config);
1081    }
1082
1083    #[test]
1084    fn test_random_read_regression4() {
1085        let object_size = 14201;
1086        let reads = vec![(3584, 1), (9424, 1460), (3582, 3340), (248, 9218)];
1087        let config = TestConfig {
1088            initial_request_size: 457999,
1089            max_read_window_size: 863511,
1090            sequential_prefetch_multiplier: 5,
1091            client_part_size: 1972409,
1092            max_forward_seek_wait_distance: 2810651,
1093            max_backward_seek_distance: 3531090,
1094            cache_block_size: 1 * MB,
1095        };
1096        run_random_read_test(PrefetcherType::Default, object_size, reads, config);
1097    }
1098
1099    #[test]
1100    fn test_forward_seek_failure() {
1101        const PART_SIZE: usize = 8192;
1102        const OBJECT_SIZE: usize = 2 * PART_SIZE;
1103
1104        let client = MockClient::config()
1105            .bucket("test-bucket")
1106            .part_size(PART_SIZE)
1107            .enable_backpressure(true)
1108            .initial_read_window_size(OBJECT_SIZE)
1109            .build();
1110        let object = MockObject::ramp(0xaa, OBJECT_SIZE, ETag::for_tests());
1111        let etag = object.etag();
1112        client.add_object("hello", object);
1113
1114        let mut get_failures = HashMap::new();
1115        get_failures.insert(
1116            1,
1117            GetObjectFailureMode::StreamPositionError(
1118                2,
1119                ObjectClientError::ClientError(MockClientError(
1120                    "error in the second chunk of the first request".into(),
1121                )),
1122            ),
1123        );
1124        get_failures.insert(
1125            2,
1126            GetObjectFailureMode::OperationError(ObjectClientError::ClientError(MockClientError(
1127                "error in second request".into(),
1128            ))),
1129        );
1130
1131        let client = Arc::new(countdown_failure_client(
1132            client,
1133            CountdownFailureConfig {
1134                get_failures,
1135                ..Default::default()
1136            },
1137        ));
1138        let prefetcher = build_prefetcher(client, PrefetcherType::Default, Default::default());
1139        block_on(async {
1140            let object_id = ObjectId::new("hello".to_owned(), etag.clone());
1141            let fh = HandleId::new(1);
1142            let mut request = prefetcher.prefetch("test-bucket".to_owned(), object_id, fh, OBJECT_SIZE as u64);
1143
1144            // The first read should trigger the prefetcher to try and get the whole object (in 2 parts).
1145            _ = request.read(0, 1).await.expect("first read should succeed");
1146
1147            // Seek to the second part (where we injected a failure).
1148            let offset = PART_SIZE + 1;
1149            _ = request.read(offset as u64, 1).await.expect_err("seek should fail");
1150
1151            // A retry should trigger a new request (also failing).
1152            _ = request
1153                .read(offset as u64, 1)
1154                .await
1155                .expect_err("first retry after failure should fail");
1156
1157            // New retry should succeed (no more failures injected).
1158            let byte = request
1159                .read(offset as u64, 1)
1160                .await
1161                .expect("second retry should succeed");
1162            let expected = ramp_bytes(0xaa + offset, 1);
1163            assert_eq!(byte.into_bytes().unwrap()[..], expected[..]);
1164        });
1165    }
1166
1167    #[test_case(PrefetcherType::Default)]
1168    #[test_case(PrefetcherType::InMemoryCache(8192))]
1169    fn test_short_read_failure(prefetcher_type: PrefetcherType) {
1170        const PART_SIZE: usize = 8192;
1171        const OBJECT_SIZE: usize = 2 * PART_SIZE;
1172
1173        let client = MockClient::config()
1174            .bucket("test-bucket")
1175            .part_size(PART_SIZE)
1176            .enable_backpressure(true)
1177            .initial_read_window_size(PART_SIZE)
1178            .build();
1179        let object = MockObject::ramp(0xaa, OBJECT_SIZE, ETag::for_tests());
1180        let etag = object.etag();
1181        client.add_object("hello", object);
1182
1183        let mut get_failures = HashMap::new();
1184        // On first request, terminate the stream without producing any data
1185        get_failures.insert(1, GetObjectFailureMode::StreamShortCircuit(1));
1186        // On third request (second request of second prefetcher),
1187        // terminate the stream early without producing all the requested data
1188        get_failures.insert(3, GetObjectFailureMode::StreamShortCircuit(1));
1189
1190        let client = Arc::new(countdown_failure_client(
1191            client,
1192            CountdownFailureConfig {
1193                get_failures,
1194                ..Default::default()
1195            },
1196        ));
1197        let prefetcher_config = PrefetcherConfig {
1198            initial_request_size: PART_SIZE,
1199            ..Default::default()
1200        };
1201        let prefetcher = build_prefetcher(client, prefetcher_type, prefetcher_config);
1202
1203        block_on(async {
1204            let object_id = ObjectId::new("hello".to_owned(), etag.clone());
1205            let fh = HandleId::new(1);
1206            let mut request = prefetcher.prefetch("test-bucket".to_owned(), object_id, fh, OBJECT_SIZE as u64);
1207
1208            // First read will terminate early
1209            assert!(matches!(
1210                request.read(0, 10).await.expect_err("read should fail"),
1211                PrefetchReadError::GetRequestTerminatedUnexpectedly,
1212            ));
1213
1214            // Second read will return first part, but then terminate early before returning the remaining parts
1215            let bytes = request.read(0, PART_SIZE).await.unwrap();
1216            let expected = ramp_bytes(0xaa, PART_SIZE);
1217            assert_eq!(bytes.into_bytes().unwrap()[..], expected[..]);
1218            _ = request
1219                .read(PART_SIZE as u64, PART_SIZE)
1220                .await
1221                .expect_err("read should fail");
1222
1223            // There are no more failures injected, since the prefetcher will reset on failure, now we should be able to read the whole data.
1224            let bytes = request.read(0, OBJECT_SIZE).await.unwrap();
1225            let expected = ramp_bytes(0xaa, OBJECT_SIZE);
1226            assert_eq!(bytes.into_bytes().unwrap()[..], expected[..]);
1227
1228            // Shouldn't fail if the short read is due to object size not due to the stream terminating early
1229            let bytes = request.read(PART_SIZE as u64, OBJECT_SIZE).await.unwrap();
1230            let expected = ramp_bytes(0xaa + PART_SIZE, PART_SIZE);
1231            assert_eq!(bytes.into_bytes().unwrap()[..], expected[..]);
1232        });
1233    }
1234
1235    #[test_case(0, 25; "no first read")]
1236    #[test_case(60, 25; "read beyond first part")]
1237    #[test_case(20, 25; "read in first part")]
1238    #[test_case(125, 110; "read in second request")]
1239    fn test_forward_seek(first_read_size: usize, part_size: usize) {
1240        const OBJECT_SIZE: usize = 200;
1241        const FIRST_REQUEST_SIZE: usize = 100;
1242
1243        let client = Arc::new(
1244            MockClient::config()
1245                .bucket("test-bucket")
1246                .part_size(part_size)
1247                .enable_backpressure(true)
1248                .initial_read_window_size(FIRST_REQUEST_SIZE)
1249                .build(),
1250        );
1251        let object = MockObject::ramp(0xaa, OBJECT_SIZE, ETag::for_tests());
1252        let etag = object.etag();
1253
1254        client.add_object("hello", object);
1255
1256        let prefetcher = build_prefetcher(client, PrefetcherType::Default, Default::default());
1257
1258        // Try every possible seek from first_read_size
1259        for offset in first_read_size + 1..OBJECT_SIZE {
1260            let object_id = ObjectId::new("hello".to_owned(), etag.clone());
1261            let fh = HandleId::new(1);
1262            let mut request = prefetcher.prefetch("test-bucket".to_owned(), object_id, fh, OBJECT_SIZE as u64);
1263            if first_read_size > 0 {
1264                let _first_read = block_on(request.read(0, first_read_size)).unwrap();
1265            }
1266
1267            let byte = block_on(request.read(offset as u64, 1)).unwrap();
1268            let expected = ramp_bytes(0xaa + offset, 1);
1269            assert_eq!(byte.into_bytes().unwrap()[..], expected[..]);
1270        }
1271    }
1272
1273    #[test_case(60, 25; "read beyond first part")]
1274    #[test_case(20, 25; "read in first part")]
1275    #[test_case(125, 110; "read in second request")]
1276    fn test_backward_seek(first_read_size: usize, part_size: usize) {
1277        const OBJECT_SIZE: usize = 200;
1278
1279        let client = Arc::new(
1280            MockClient::config()
1281                .bucket("test-bucket")
1282                .part_size(part_size)
1283                .enable_backpressure(true)
1284                .initial_read_window_size(part_size)
1285                .build(),
1286        );
1287        let object = MockObject::ramp(0xaa, OBJECT_SIZE, ETag::for_tests());
1288        let etag = object.etag();
1289
1290        client.add_object("hello", object);
1291
1292        let prefetcher = build_prefetcher(client, PrefetcherType::Default, Default::default());
1293
1294        // Try every possible seek from first_read_size
1295        for offset in 0..first_read_size {
1296            let object_id = ObjectId::new("hello".to_owned(), etag.clone());
1297            let fh = HandleId::new(1);
1298            let mut request = prefetcher.prefetch("test-bucket".to_owned(), object_id, fh, OBJECT_SIZE as u64);
1299            if first_read_size > 0 {
1300                let _first_read = block_on(request.read(0, first_read_size)).unwrap();
1301            }
1302
1303            let byte = block_on(request.read(offset as u64, 1)).unwrap();
1304            let expected = ramp_bytes(0xaa + offset, 1);
1305            assert_eq!(byte.into_bytes().unwrap()[..], expected[..]);
1306        }
1307    }
1308
1309    #[test_case(8 * 1024 * 1024, 1 * 1024 * 1024, 8 * 1024 * 1024; "8MiB part_size, 1MiB window")]
1310    #[test_case(1 * 1024 * 1024, 1 * 1024 * 1024, 1 * 1024 * 1024; "equal part_size and window")]
1311    #[test_case(250 * 1024, 1 * 1024 * 1024, 1250 * 1024; "window larger than part_size")]
1312    fn test_seek_window_reservation(part_size: usize, seek_window_size: usize, expected: u64) {
1313        let reservation = PrefetchGetObject::<MockClient>::seek_window_reservation(part_size, seek_window_size);
1314        assert_eq!(reservation, expected);
1315    }
1316
1317    #[test_case(8 * MB, 8 * MB, 1 * MB + 128 * KB; "default")]
1318    #[test_case(8 * MB, 8 * MB, 0; "no initial request")]
1319    #[test_case(1 * KB, 1 * MB, 10 * MB; "initial request larger than part size")]
1320    #[test_case(16 * MB, 8 * MB, 1 * MB + 128 * KB; "larger intial read window")]
1321    #[test_case(16 * MB, 8 * MB, 0; "larger intial read window w/o initial request")]
1322    #[test_case(1 * KB, 8 * MB, 1 * MB + 128 * KB; "smaller intial read window")]
1323    #[test_case(1 * KB, 8 * MB, 0; "smaller intial read window w/o initial request")]
1324    fn test_initial_reqeust_size(initial_read_window_size: usize, part_size: usize, initial_request_size: usize) {
1325        let object_size = (16 * MB) as u64;
1326
1327        let client = Arc::new(
1328            MockClient::config()
1329                .bucket("test-bucket")
1330                .part_size(part_size)
1331                .enable_backpressure(true)
1332                .initial_read_window_size(initial_read_window_size)
1333                .build(),
1334        );
1335
1336        let object = MockObject::ramp(0xaa, object_size as usize, ETag::for_tests());
1337        let etag = object.etag();
1338        client.add_object("test-object", object);
1339
1340        let prefetcher_config = PrefetcherConfig {
1341            initial_request_size,
1342            ..Default::default()
1343        };
1344
1345        let prefetcher = build_prefetcher(client.clone(), PrefetcherType::Default, prefetcher_config);
1346        let object_id = ObjectId::new("test-object".to_owned(), etag);
1347        let fh = HandleId::new(1);
1348        let mut request = prefetcher.prefetch("test-bucket".to_owned(), object_id, fh, object_size);
1349
1350        // Perform sequential reads to test the download functionality
1351        let mut next_offset = 0;
1352        while next_offset < object_size {
1353            let buf = block_on(request.read(next_offset, 256 * KB)).unwrap();
1354            if buf.is_empty() {
1355                break;
1356            }
1357            let buf = buf.into_bytes().unwrap();
1358            let expected = ramp_bytes((0xaa + next_offset) as usize, buf.len());
1359            assert_eq!(&buf[..], &expected[..]);
1360            next_offset += buf.len() as u64;
1361        }
1362        assert_eq!(next_offset, object_size);
1363    }
1364
1365    #[cfg(feature = "shuttle")]
1366    mod shuttle_tests {
1367        use super::*;
1368        use futures::task::{FutureObj, Spawn, SpawnError};
1369        use shuttle::future::block_on;
1370        use shuttle::rand::Rng;
1371        use shuttle::{check_pct, check_random};
1372
1373        struct ShuttleRuntime;
1374        impl Spawn for ShuttleRuntime {
1375            fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> {
1376                shuttle::future::spawn(future);
1377                Ok(())
1378            }
1379        }
1380
1381        fn sequential_read_stress_helper() {
1382            let mut rng = shuttle::rand::thread_rng();
1383            let object_size = rng.gen_range(1u64..1 * 1024 * 1024);
1384            let max_read_window_size = rng.gen_range(16usize..1 * 1024 * 1024);
1385            let sequential_prefetch_multiplier = rng.gen_range(2usize..16);
1386            let part_size = rng.gen_range(16usize..INITIAL_REQUEST_SIZE);
1387            let initial_request_size = rng.gen_range(0..INITIAL_REQUEST_SIZE);
1388            let max_forward_seek_wait_distance = rng.gen_range(16u64..1 * 1024 * 1024 + 256 * 1024);
1389            let max_backward_seek_distance = rng.gen_range(16u64..1 * 1024 * 1024 + 256 * 1024);
1390
1391            let client = Arc::new(
1392                MockClient::config()
1393                    .bucket("test-bucket")
1394                    .part_size(part_size)
1395                    .enable_backpressure(true)
1396                    .initial_read_window_size(part_size)
1397                    .build(),
1398            );
1399            let pool = PagedPool::new_with_candidate_sizes([part_size]);
1400            let mem_limiter = Arc::new(MemoryLimiter::new(pool, MINIMUM_MEM_LIMIT));
1401            let object = MockObject::ramp(0xaa, object_size as usize, ETag::for_tests());
1402            let file_etag = object.etag();
1403
1404            client.add_object("hello", object);
1405
1406            let prefetcher_config = PrefetcherConfig {
1407                max_read_window_size,
1408                sequential_prefetch_multiplier,
1409                max_forward_seek_wait_distance,
1410                max_backward_seek_distance,
1411                initial_request_size,
1412            };
1413
1414            let prefetcher =
1415                Prefetcher::default_builder(client).build(Runtime::new(ShuttleRuntime), mem_limiter, prefetcher_config);
1416            let object_id = ObjectId::new("hello".to_owned(), file_etag);
1417            let fh = HandleId::new(1);
1418            let mut request = prefetcher.prefetch("test-bucket".to_owned(), object_id, fh, object_size);
1419
1420            let mut next_offset = 0;
1421            loop {
1422                let read_size = rng.gen_range(1usize..1 * 1024 * 1024);
1423                let buf = block_on(request.read(next_offset, read_size)).unwrap();
1424                if buf.is_empty() {
1425                    break;
1426                }
1427                let buf = buf.into_bytes().unwrap();
1428                let expected = ramp_bytes((0xaa + next_offset) as usize, buf.len());
1429                assert_eq!(&buf[..], &expected[..buf.len()]);
1430                next_offset += buf.len() as u64;
1431            }
1432            assert_eq!(next_offset, object_size);
1433        }
1434
1435        #[test]
1436        fn sequential_read_stress() {
1437            check_random(sequential_read_stress_helper, 1000);
1438            check_pct(sequential_read_stress_helper, 1000, 3);
1439        }
1440
1441        fn random_read_stress_helper() {
1442            let mut rng = shuttle::rand::thread_rng();
1443            let max_read_window_size = rng.gen_range(16usize..32 * 1024);
1444            let sequential_prefetch_multiplier = rng.gen_range(2usize..16);
1445            let part_size = rng.gen_range(16usize..128 * 1024);
1446            let initial_request_size = rng.gen_range(16usize..128 * 1024);
1447            let max_forward_seek_wait_distance = rng.gen_range(16u64..192 * 1024);
1448            let max_backward_seek_distance = rng.gen_range(16u64..192 * 1024);
1449            // Try to prevent testing very small reads of very large objects, which are easy to OOM
1450            // under Shuttle (lots of concurrent tasks)
1451            let max_object_size = initial_request_size.min(max_read_window_size) * 20;
1452            let object_size = rng.gen_range(1u64..(64 * 1024).min(max_object_size) as u64);
1453
1454            let client = Arc::new(
1455                MockClient::config()
1456                    .bucket("test-bucket")
1457                    .part_size(part_size)
1458                    .enable_backpressure(true)
1459                    .initial_read_window_size(part_size)
1460                    .build(),
1461            );
1462            let pool = PagedPool::new_with_candidate_sizes([part_size]);
1463            let mem_limiter = Arc::new(MemoryLimiter::new(pool, MINIMUM_MEM_LIMIT));
1464            let object = MockObject::ramp(0xaa, object_size as usize, ETag::for_tests());
1465            let file_etag = object.etag();
1466
1467            client.add_object("hello", object);
1468
1469            let prefetcher_config = PrefetcherConfig {
1470                max_read_window_size,
1471                sequential_prefetch_multiplier,
1472                max_forward_seek_wait_distance,
1473                max_backward_seek_distance,
1474                initial_request_size,
1475            };
1476
1477            let prefetcher =
1478                Prefetcher::default_builder(client).build(Runtime::new(ShuttleRuntime), mem_limiter, prefetcher_config);
1479            let object_id = ObjectId::new("hello".to_owned(), file_etag);
1480            let fh = HandleId::new(1);
1481            let mut request = prefetcher.prefetch("test-bucket".to_owned(), object_id, fh, object_size);
1482
1483            let num_reads = rng.gen_range(10usize..50);
1484            for _ in 0..num_reads {
1485                let offset = rng.gen_range(0u64..object_size);
1486                let length = rng.gen_range(1usize..(object_size - offset + 1) as usize);
1487                let expected = ramp_bytes((0xaa + offset) as usize, length);
1488                let buf = block_on(request.read(offset, length)).unwrap();
1489                let buf = buf.into_bytes().unwrap();
1490                assert_eq!(buf.len(), expected.len());
1491                // Don't spew the giant buffer if this test fails
1492                if buf[..] != expected[..] {
1493                    for i in 0..buf.len() {
1494                        if buf[i] != expected[i] {
1495                            panic!(
1496                                "buffer mismatch at offset {}, saw {} expected {}",
1497                                i, buf[i], expected[i]
1498                            );
1499                        }
1500                    }
1501                }
1502            }
1503        }
1504
1505        #[test]
1506        fn random_read_stress() {
1507            check_random(random_read_stress_helper, 1000);
1508            check_pct(random_read_stress_helper, 1000, 3);
1509        }
1510    }
1511}