1use std::fmt::Debug;
35
36use metrics::{counter, histogram};
37use mountpoint_s3_client::error::{GetObjectError, ObjectClientError};
38use mountpoint_s3_client::{ObjectClient, error_metadata::ProvideErrorMetadata};
39use thiserror::Error;
40use tracing::trace;
41
42use crate::checksums::{ChecksummedBytes, IntegrityError};
43use crate::data_cache::DataCache;
44use crate::fs::error_metadata::{ErrorMetadata, MOUNTPOINT_ERROR_CLIENT};
45use crate::mem_limiter::{BufferArea, MemoryLimiter};
46use crate::metrics::defs::{FUSE_CACHE_HIT, PREFETCH_RESET_STATE};
47use crate::object::ObjectId;
48use crate::sync::Arc;
49
50mod backpressure_controller;
51mod builder;
52mod caching_stream;
53mod part;
54mod part_queue;
55mod part_stream;
56mod seek_window;
57mod task;
58
59pub use builder::PrefetcherBuilder;
60use part::PartOperationError;
61use part_stream::{PartStream, RequestRange, RequestTaskConfig};
62use seek_window::SeekWindow;
63use task::RequestTask;
64
65pub const INITIAL_REQUEST_SIZE: usize = 1024 * 1024 + 128 * 1024;
74
75#[derive(Debug, Error)]
76pub enum PrefetchReadError<E> {
77 #[error("get object request failed")]
78 GetRequestFailed {
79 source: ObjectClientError<GetObjectError, E>,
80 metadata: Box<ErrorMetadata>,
81 },
82
83 #[error("get object request returned wrong offset")]
84 GetRequestReturnedWrongOffset { offset: u64, expected_offset: u64 },
85
86 #[error("get request terminated unexpectedly")]
87 GetRequestTerminatedUnexpectedly,
88
89 #[error("integrity check failed")]
90 Integrity(#[from] IntegrityError),
91
92 #[error("part read failed")]
93 PartReadFailed(#[from] PartOperationError),
94
95 #[error("backpressure must be enabled with non-zero initial read window")]
96 BackpressurePreconditionFailed,
97
98 #[error("read window increment failed")]
99 ReadWindowIncrement,
100}
101
102impl<E: ProvideErrorMetadata + std::error::Error + Send + Sync + 'static> PrefetchReadError<E> {
103 fn get_request_failed(err: ObjectClientError<GetObjectError, E>, bucket: &str, key: &str) -> Self {
104 let metadata = ErrorMetadata {
105 client_error_meta: err.meta(),
106 error_code: Some(MOUNTPOINT_ERROR_CLIENT.to_string()),
107 s3_bucket_name: Some(bucket.to_string()),
108 s3_object_key: Some(key.to_string()),
109 };
110 let metadata = Box::new(metadata);
111 Self::GetRequestFailed { source: err, metadata }
112 }
113}
114
115#[derive(Debug, Clone, Copy)]
116pub struct PrefetcherConfig {
117 pub max_read_window_size: usize,
119 pub sequential_prefetch_multiplier: usize,
121 pub max_forward_seek_wait_distance: u64,
124 pub max_backward_seek_distance: u64,
127 pub initial_request_size: usize,
131}
132
133impl Default for PrefetcherConfig {
134 #[allow(clippy::identity_op)]
135 fn default() -> Self {
136 Self {
137 max_read_window_size: determine_max_read_size(),
138 sequential_prefetch_multiplier: 2,
139 max_forward_seek_wait_distance: 16 * 1024 * 1024,
144 max_backward_seek_distance: 1 * 1024 * 1024,
145 initial_request_size: INITIAL_REQUEST_SIZE,
146 }
147 }
148}
149
150fn determine_max_read_size() -> usize {
162 const ENV_VAR_KEY: &str = "UNSTABLE_MOUNTPOINT_MAX_PREFETCH_WINDOW_SIZE";
163 const DEFAULT_READ_WINDOW_SIZE: usize = 2 * 1024 * 1024 * 1024;
164
165 match std::env::var_os(ENV_VAR_KEY) {
166 Some(val) => match val.to_string_lossy().parse() {
167 Ok(val) => {
168 tracing::warn!(
169 "successfully overridden prefetch read window size \
170 with new value {val} bytes from unstable environment config",
171 );
172 val
173 }
174 Err(_) => {
175 tracing::warn!(
176 "{ENV_VAR_KEY} did not contain a valid positive integer \
177 for prefetch bytes, using {DEFAULT_READ_WINDOW_SIZE} bytes instead",
178 );
179 DEFAULT_READ_WINDOW_SIZE
180 }
181 },
182 None => DEFAULT_READ_WINDOW_SIZE,
183 }
184}
185
186#[derive(Debug)]
188pub struct Prefetcher<Client> {
189 part_stream: PartStream<Client>,
190 config: PrefetcherConfig,
191 mem_limiter: Arc<MemoryLimiter>,
192}
193
194impl<Client> Prefetcher<Client>
195where
196 Client: ObjectClient + Clone + Send + Sync + 'static,
197{
198 pub fn default_builder(client: Client) -> PrefetcherBuilder<Client> {
200 PrefetcherBuilder::default_builder(client)
201 }
202
203 pub fn caching_builder<Cache>(cache: Cache, client: Client) -> PrefetcherBuilder<Client>
205 where
206 Cache: DataCache + Send + Sync + 'static,
207 {
208 PrefetcherBuilder::caching_builder(cache, client)
209 }
210
211 pub fn new(part_stream: PartStream<Client>, config: PrefetcherConfig, mem_limiter: Arc<MemoryLimiter>) -> Self {
213 Self {
214 part_stream,
215 config,
216 mem_limiter,
217 }
218 }
219
220 pub fn prefetch(&self, bucket: String, object_id: ObjectId, size: u64) -> PrefetchGetObject<Client>
222 where
223 Client: ObjectClient + Clone + Send + Sync + 'static,
224 {
225 PrefetchGetObject::new(
226 self.part_stream.clone(),
227 self.config,
228 bucket,
229 object_id,
230 size,
231 self.mem_limiter.clone(),
232 )
233 }
234}
235
236#[derive(Debug)]
238pub struct PrefetchGetObject<Client>
239where
240 Client: ObjectClient + Clone + Send + Sync + 'static,
241{
242 part_stream: PartStream<Client>,
243 config: PrefetcherConfig,
244 backpressure_task: Option<RequestTask<Client>>,
245 backward_seek_window: SeekWindow,
248 bucket: String,
249 object_id: ObjectId,
250 preferred_part_size: usize,
252 sequential_read_start_offset: u64,
254 next_sequential_read_offset: u64,
255 next_request_offset: u64,
256 size: u64,
257 mem_limiter: Arc<MemoryLimiter>,
258}
259
260impl<Client> PrefetchGetObject<Client>
261where
262 Client: ObjectClient + Clone + Send + Sync + 'static,
263{
264 fn new(
266 part_stream: PartStream<Client>,
267 config: PrefetcherConfig,
268 bucket: String,
269 object_id: ObjectId,
270 size: u64,
271 mem_limiter: Arc<MemoryLimiter>,
272 ) -> Self {
273 let max_backward_seek_distance = config.max_backward_seek_distance as usize;
274 let seek_window_reservation =
277 Self::seek_window_reservation(part_stream.client().read_part_size(), max_backward_seek_distance);
278 mem_limiter.reserve(BufferArea::Prefetch, seek_window_reservation);
279 PrefetchGetObject {
280 part_stream,
281 config,
282 backpressure_task: None,
283 backward_seek_window: SeekWindow::new(max_backward_seek_distance),
284 preferred_part_size: 128 * 1024,
285 sequential_read_start_offset: 0,
286 next_sequential_read_offset: 0,
287 next_request_offset: 0,
288 bucket,
289 object_id,
290 size,
291 mem_limiter,
292 }
293 }
294
295 pub async fn read(
299 &mut self,
300 offset: u64,
301 length: usize,
302 ) -> Result<ChecksummedBytes, PrefetchReadError<Client::ClientError>> {
303 trace!(
304 offset,
305 length,
306 next_seq_offset = self.next_sequential_read_offset,
307 "read"
308 );
309
310 match self.try_read(offset, length).await {
311 Ok((data, cache_hit)) => {
312 if !data.is_empty() && cache_hit {
319 metrics::counter!(FUSE_CACHE_HIT).increment(1);
323 }
324 Ok(data)
325 }
326 Err(err) => {
327 self.reset_prefetch_to_offset(offset);
328 Err(err)
329 }
330 }
331 }
332
333 async fn try_read(
334 &mut self,
335 offset: u64,
336 length: usize,
337 ) -> Result<(ChecksummedBytes, bool), PrefetchReadError<Client::ClientError>> {
338 let max_preferred_part_size = 1024 * 1024;
346 self.preferred_part_size = self.preferred_part_size.max(length).min(max_preferred_part_size);
347
348 let remaining = self.size.saturating_sub(offset);
349 if remaining == 0 {
350 return Ok((ChecksummedBytes::default(), false));
351 }
352 let mut to_read = (length as u64).min(remaining);
353
354 if self.next_sequential_read_offset != offset {
357 if self.try_seek(offset).await? {
358 trace!("seek succeeded");
359 } else {
360 trace!(
361 expected = self.next_sequential_read_offset,
362 actual = offset,
363 "out-of-order read, resetting prefetch"
364 );
365 counter!(PREFETCH_RESET_STATE).increment(1);
366
367 self.record_contiguous_read_metric();
369
370 self.reset_prefetch_to_offset(offset);
371 }
372 }
373 assert_eq!(self.next_sequential_read_offset, offset);
374
375 if self.backpressure_task.is_none() {
376 self.backpressure_task = Some(self.spawn_read_backpressure_request()?);
377 }
378
379 let mut all_parts_from_cache = true;
380 let mut response = ChecksummedBytes::default();
381 while to_read > 0 {
382 let Some(current_task) = self.backpressure_task.as_mut() else {
383 trace!(offset, length, "read beyond object size");
384 break;
385 };
386 debug_assert!(current_task.remaining() > 0);
387
388 let part = current_task.read(to_read as usize).await?;
389 all_parts_from_cache &= part.is_from_cache();
390 self.backward_seek_window.push(part.clone());
391 let part_bytes = part.into_bytes(&self.object_id, self.next_sequential_read_offset)?;
392
393 self.next_sequential_read_offset += part_bytes.len() as u64;
394 if response.is_empty() && part_bytes.len() == to_read as usize {
398 return Ok((part_bytes, all_parts_from_cache));
399 }
400
401 let part_len = part_bytes.len() as u64;
402 response.extend(part_bytes)?;
403 to_read -= part_len;
404 }
405
406 Ok((response, all_parts_from_cache))
407 }
408
409 fn spawn_read_backpressure_request(
412 &mut self,
413 ) -> Result<RequestTask<Client>, PrefetchReadError<Client::ClientError>> {
414 let start = self.next_sequential_read_offset;
415 let object_size = self.size as usize;
416 let read_part_size = self.part_stream.client().read_part_size();
417 let range = RequestRange::new(object_size, start, object_size);
418
419 match self.part_stream.client().initial_read_window_size() {
421 Some(value) => {
422 if value == 0 {
424 return Err(PrefetchReadError::BackpressurePreconditionFailed);
425 }
426 }
427 None => return Err(PrefetchReadError::BackpressurePreconditionFailed),
428 };
429
430 let config = RequestTaskConfig {
431 bucket: self.bucket.clone(),
432 object_id: self.object_id.clone(),
433 range,
434 read_part_size,
435 preferred_part_size: self.preferred_part_size,
436 initial_request_size: self.config.initial_request_size,
437 max_read_window_size: self.config.max_read_window_size,
438 read_window_size_multiplier: self.config.sequential_prefetch_multiplier,
439 };
440 Ok(self.part_stream.spawn_get_object_request(config))
441 }
442
443 fn reset_prefetch_to_offset(&mut self, offset: u64) {
445 self.backpressure_task = None;
446 self.backward_seek_window.clear();
447 self.sequential_read_start_offset = offset;
448 self.next_sequential_read_offset = offset;
449 self.next_request_offset = offset;
450 }
451
452 async fn try_seek(&mut self, offset: u64) -> Result<bool, PrefetchReadError<Client::ClientError>> {
456 assert_ne!(offset, self.next_sequential_read_offset);
457 trace!(from = self.next_sequential_read_offset, to = offset, "trying to seek");
458 if offset > self.next_sequential_read_offset {
459 self.try_seek_forward(offset).await
460 } else {
461 self.try_seek_backward(offset).await
462 }
463 }
464
465 async fn try_seek_forward(&mut self, offset: u64) -> Result<bool, PrefetchReadError<Client::ClientError>> {
466 assert!(offset > self.next_sequential_read_offset);
467 let total_seek_distance = offset - self.next_sequential_read_offset;
468 histogram!("prefetch.seek_distance", "dir" => "forward").record(total_seek_distance as f64);
469
470 let Some(task) = self.backpressure_task.as_mut() else {
471 return Ok(false);
473 };
474
475 if offset >= task.read_window_end_offset() {
477 return Ok(false);
478 }
479
480 let available_offset = task.available_offset();
484 let available_soon_offset = available_offset.saturating_add(self.config.max_forward_seek_wait_distance);
485 if offset >= available_soon_offset {
486 trace!(
487 requested_offset = offset,
488 available_offset = available_offset,
489 "seek failed: not enough data available"
490 );
491 return Ok(false);
492 }
493 let mut seek_distance = offset - self.next_sequential_read_offset;
494 while seek_distance > 0 {
495 let part = task.read(seek_distance as usize).await?;
496 seek_distance -= part.len() as u64;
497 self.next_sequential_read_offset += part.len() as u64;
498 self.backward_seek_window.push(part);
499 }
500 Ok(true)
501 }
502
503 async fn try_seek_backward(&mut self, offset: u64) -> Result<bool, PrefetchReadError<Client::ClientError>> {
504 assert!(offset < self.next_sequential_read_offset);
505
506 let Some(task) = self.backpressure_task.as_mut() else {
509 return Ok(false);
510 };
511 let backwards_length_needed = self.next_sequential_read_offset - offset;
512 histogram!("prefetch.seek_distance", "dir" => "backward").record(backwards_length_needed as f64);
513
514 let Some(parts) = self.backward_seek_window.read_back(backwards_length_needed as usize) else {
515 trace!("seek failed: not enough data in backwards seek window");
516 return Ok(false);
517 };
518 task.push_front(parts).await?;
523 self.next_sequential_read_offset = offset;
524 Ok(true)
525 }
526
527 fn record_contiguous_read_metric(&self) {
531 histogram!("prefetch.contiguous_read_len")
532 .record((self.next_sequential_read_offset - self.sequential_read_start_offset) as f64);
533 }
534
535 fn seek_window_reservation(part_size: usize, seek_window_size: usize) -> u64 {
539 (seek_window_size.div_ceil(part_size) * part_size) as u64
540 }
541}
542
543impl<Client> Drop for PrefetchGetObject<Client>
544where
545 Client: ObjectClient + Clone + Send + Sync + 'static,
546{
547 fn drop(&mut self) {
548 let seek_window_reservation = Self::seek_window_reservation(
549 self.part_stream.client().read_part_size(),
550 self.backward_seek_window.max_size(),
551 );
552 self.mem_limiter.release(BufferArea::Prefetch, seek_window_reservation);
553 self.record_contiguous_read_metric();
554 }
555}
556
557#[cfg(test)]
558mod tests {
559 #![allow(clippy::identity_op)]
561
562 use crate::Runtime;
563 use crate::data_cache::InMemoryDataCache;
564 use crate::mem_limiter::{MINIMUM_MEM_LIMIT, MemoryLimiter};
565 use crate::memory::PagedPool;
566 use crate::sync::Arc;
567
568 use super::*;
569 use futures::executor::{ThreadPool, block_on};
570 use mountpoint_s3_client::failure_client::{
571 CountdownFailureConfig, GetObjectFailureMode, countdown_failure_client,
572 };
573 use mountpoint_s3_client::mock_client::{MockClient, MockClientConfig, MockClientError, MockObject, ramp_bytes};
574 use mountpoint_s3_client::types::ETag;
575 use proptest::proptest;
576 use proptest::strategy::{Just, Strategy};
577 use proptest_derive::Arbitrary;
578 use std::collections::HashMap;
579 use test_case::test_case;
580
581 const KB: usize = 1024;
582 const MB: usize = 1024 * 1024;
583
584 #[derive(Debug, Arbitrary)]
585 struct TestConfig {
586 #[proptest(strategy = "16usize..1*1024*1024")]
587 initial_request_size: usize,
588 #[proptest(strategy = "16usize..1*1024*1024")]
589 max_read_window_size: usize,
590 #[proptest(strategy = "1usize..8usize")]
591 sequential_prefetch_multiplier: usize,
592 #[proptest(strategy = "16usize..2*1024*1024")]
593 client_part_size: usize,
594 #[proptest(strategy = "1u64..4*1024*1024")]
595 max_forward_seek_wait_distance: u64,
596 #[proptest(strategy = "1u64..4*1024*1024")]
597 max_backward_seek_distance: u64,
598 #[proptest(strategy = "16usize..1*1024*1024")]
599 cache_block_size: usize,
600 }
601
602 enum PrefetcherType {
603 Default,
604 InMemoryCache(usize),
605 }
606
607 fn build_prefetcher<Client>(
608 client: Client,
609 prefetcher_type: PrefetcherType,
610 prefetcher_config: PrefetcherConfig,
611 ) -> Prefetcher<Client>
612 where
613 Client: ObjectClient + Clone + Send + Sync + 'static,
614 {
615 let pool = PagedPool::new_with_candidate_sizes([client.read_part_size(), client.write_part_size()]);
616 let mem_limiter = Arc::new(MemoryLimiter::new(pool, MINIMUM_MEM_LIMIT));
617 let runtime = Runtime::new(ThreadPool::builder().pool_size(1).create().unwrap());
618 let builder = match prefetcher_type {
619 PrefetcherType::Default => Prefetcher::default_builder(client),
620 PrefetcherType::InMemoryCache(block_size) => {
621 let cache = InMemoryDataCache::new(block_size as u64);
622 Prefetcher::caching_builder(cache, client)
623 }
624 };
625 builder.build(runtime, mem_limiter, prefetcher_config)
626 }
627
628 fn run_sequential_read_test(prefetcher_type: PrefetcherType, size: u64, read_size: usize, test_config: TestConfig) {
629 let client = Arc::new(
630 MockClient::config()
631 .bucket("test-bucket")
632 .part_size(test_config.client_part_size)
633 .enable_backpressure(true)
634 .initial_read_window_size(test_config.client_part_size)
635 .build(),
636 );
637 let object = MockObject::ramp(0xaa, size as usize, ETag::for_tests());
638 let etag = object.etag();
639
640 client.add_object("hello", object);
641
642 let prefetcher_config = PrefetcherConfig {
643 max_read_window_size: test_config.max_read_window_size,
644 sequential_prefetch_multiplier: test_config.sequential_prefetch_multiplier,
645 max_forward_seek_wait_distance: test_config.max_forward_seek_wait_distance,
646 max_backward_seek_distance: test_config.max_backward_seek_distance,
647 initial_request_size: test_config.initial_request_size,
648 };
649
650 let prefetcher = build_prefetcher(client.clone(), prefetcher_type, prefetcher_config);
651 let object_id = ObjectId::new("hello".to_owned(), etag);
652 let mut request = prefetcher.prefetch("test-bucket".to_owned(), object_id, size);
653
654 let mut next_offset = 0;
655 loop {
656 let buf = block_on(request.read(next_offset, read_size)).unwrap();
657 if buf.is_empty() {
658 break;
659 }
660 let buf = buf.into_bytes().unwrap();
661 let expected = ramp_bytes((0xaa + next_offset) as usize, buf.len());
662 assert_eq!(&buf[..], &expected[..buf.len()]);
663 next_offset += buf.len() as u64;
664 }
665 assert_eq!(next_offset, size);
666 }
667
668 #[test_case(PrefetcherType::Default)]
669 #[test_case(PrefetcherType::InMemoryCache(1 * MB))]
670 fn sequential_read_small(prefetcher_type: PrefetcherType) {
671 let config = TestConfig {
672 initial_request_size: 256 * 1024,
673 max_read_window_size: 1024 * 1024 * 1024,
674 sequential_prefetch_multiplier: 8,
675 client_part_size: 8 * 1024 * 1024,
676 max_forward_seek_wait_distance: 16 * 1024 * 1024,
677 max_backward_seek_distance: 2 * 1024 * 1024,
678 cache_block_size: 1 * MB,
679 };
680 run_sequential_read_test(prefetcher_type, 1024 * 1024 + 111, 1024 * 1024, config);
681 }
682
683 #[test_case(PrefetcherType::Default)]
684 #[test_case(PrefetcherType::InMemoryCache(1 * MB))]
685 fn sequential_read_medium(prefetcher_type: PrefetcherType) {
686 let config = TestConfig {
687 initial_request_size: 256 * 1024,
688 max_read_window_size: 64 * 1024 * 1024,
689 sequential_prefetch_multiplier: 8,
690 client_part_size: 8 * 1024 * 1024,
691 max_forward_seek_wait_distance: 16 * 1024 * 1024,
692 max_backward_seek_distance: 2 * 1024 * 1024,
693 cache_block_size: 1 * MB,
694 };
695 run_sequential_read_test(prefetcher_type, 16 * 1024 * 1024 + 111, 1024 * 1024, config);
696 }
697
698 #[test_case(PrefetcherType::Default)]
699 #[test_case(PrefetcherType::InMemoryCache(1 * MB))]
700 fn sequential_read_large(prefetcher_type: PrefetcherType) {
701 let config = TestConfig {
702 initial_request_size: 256 * 1024,
703 max_read_window_size: 64 * 1024 * 1024,
704 sequential_prefetch_multiplier: 8,
705 client_part_size: 8 * 1024 * 1024,
706 max_forward_seek_wait_distance: 16 * 1024 * 1024,
707 max_backward_seek_distance: 2 * 1024 * 1024,
708 cache_block_size: 1 * MB,
709 };
710
711 run_sequential_read_test(prefetcher_type, 256 * 1024 * 1024 + 111, 1024 * 1024, config);
712 }
713
714 fn fail_with_backpressure_precondition_test(
715 prefetcher_type: PrefetcherType,
716 test_config: TestConfig,
717 client_config: MockClientConfig,
718 ) {
719 let client = Arc::new(MockClient::new(client_config));
720 let read_size = 1 * MB;
721 let object_size = 8 * MB;
722 let object = MockObject::ramp(0xaa, object_size, ETag::for_tests());
723 let etag = object.etag();
724
725 let prefetcher_config = PrefetcherConfig {
726 max_read_window_size: test_config.max_read_window_size,
727 sequential_prefetch_multiplier: test_config.sequential_prefetch_multiplier,
728 ..Default::default()
729 };
730
731 let prefetcher = build_prefetcher(client, prefetcher_type, prefetcher_config);
732 let object_id = ObjectId::new("hello".to_owned(), etag);
733 let mut request = prefetcher.prefetch("test-bucket".to_owned(), object_id, object_size as u64);
734 let result = block_on(request.read(0, read_size));
735 assert!(matches!(result, Err(PrefetchReadError::BackpressurePreconditionFailed)));
736 }
737
738 #[test_case(PrefetcherType::Default)]
739 #[test_case(PrefetcherType::InMemoryCache(1 * MB))]
740 fn fail_with_backpressure_not_enabled(prefetcher_type: PrefetcherType) {
741 let test_config = TestConfig {
742 initial_request_size: 256 * 1024,
743 max_read_window_size: 1024 * 1024 * 1024,
744 sequential_prefetch_multiplier: 8,
745 client_part_size: 8 * 1024 * 1024,
746 max_forward_seek_wait_distance: 16 * 1024 * 1024,
747 max_backward_seek_distance: 2 * 1024 * 1024,
748 cache_block_size: 1 * MB,
749 };
750
751 let config = MockClient::config()
753 .bucket("test-bucket")
754 .part_size(test_config.client_part_size)
755 .enable_backpressure(false);
756
757 fail_with_backpressure_precondition_test(prefetcher_type, test_config, config);
758 }
759
760 #[test_case(PrefetcherType::Default)]
761 #[test_case(PrefetcherType::InMemoryCache(1 * MB))]
762 fn fail_with_backpressure_zero_read_window(prefetcher_type: PrefetcherType) {
763 let test_config = TestConfig {
764 initial_request_size: 256 * 1024,
765 max_read_window_size: 1024 * 1024 * 1024,
766 sequential_prefetch_multiplier: 8,
767 client_part_size: 8 * 1024 * 1024,
768 max_forward_seek_wait_distance: 16 * 1024 * 1024,
769 max_backward_seek_distance: 2 * 1024 * 1024,
770 cache_block_size: 1 * MB,
771 };
772
773 let config = MockClient::config()
775 .bucket("test-bucket")
776 .part_size(test_config.client_part_size)
777 .enable_backpressure(true)
778 .initial_read_window_size(0);
779
780 fail_with_backpressure_precondition_test(prefetcher_type, test_config, config);
781 }
782
783 fn fail_sequential_read_test(
784 prefetcher_type: PrefetcherType,
785 size: u64,
786 read_size: usize,
787 test_config: TestConfig,
788 get_failures: HashMap<usize, GetObjectFailureMode<MockClientError>>,
789 ) {
790 let client = MockClient::config()
791 .bucket("test-bucket")
792 .part_size(test_config.client_part_size)
793 .enable_backpressure(true)
794 .initial_read_window_size(test_config.client_part_size)
795 .build();
796 let object = MockObject::ramp(0xaa, size as usize, ETag::for_tests());
797 let etag = object.etag();
798
799 client.add_object("hello", object);
800
801 let client = Arc::new(countdown_failure_client(
802 client,
803 CountdownFailureConfig {
804 get_failures,
805 ..Default::default()
806 },
807 ));
808
809 let prefetcher_config = PrefetcherConfig {
810 max_read_window_size: test_config.max_read_window_size,
811 sequential_prefetch_multiplier: test_config.sequential_prefetch_multiplier,
812 initial_request_size: test_config.initial_request_size,
813 ..Default::default()
814 };
815
816 let prefetcher = build_prefetcher(client, prefetcher_type, prefetcher_config);
817 let object_id = ObjectId::new("hello".to_owned(), etag);
818 let mut request = prefetcher.prefetch("test-bucket".to_owned(), object_id, size);
819
820 let mut next_offset = 0;
821 loop {
822 let buf = match block_on(request.read(next_offset, read_size)) {
823 Ok(buf) => buf,
824 Err(_) => break,
825 };
826 let buf = buf.into_bytes().unwrap();
827
828 if buf.is_empty() {
829 break;
830 }
831 let expected = ramp_bytes((0xaa + next_offset) as usize, buf.len());
832 assert_eq!(&buf[..], &expected[..buf.len()]);
833 next_offset += buf.len() as u64;
834 }
835 assert!(next_offset < size); }
837
838 #[test_case("invalid range; length=42", PrefetcherType::Default)]
839 #[test_case("invalid range; length=42", PrefetcherType::InMemoryCache(1 * MB))]
840 #[test_case(
842 "At least one of the pre-conditions you specified did not hold",
843 PrefetcherType::Default
844 )]
845 #[test_case("At least one of the pre-conditions you specified did not hold", PrefetcherType::InMemoryCache(1 * MB))]
846 fn fail_request_sequential_small(err_value: &str, prefetcher_type: PrefetcherType) {
847 let config = TestConfig {
848 initial_request_size: 256 * 1024,
849 max_read_window_size: 1024 * 1024 * 1024,
850 sequential_prefetch_multiplier: 8,
851 client_part_size: 8 * 1024 * 1024,
852 max_forward_seek_wait_distance: 16 * 1024 * 1024,
853 max_backward_seek_distance: 2 * 1024 * 1024,
854 cache_block_size: 1 * MB,
855 };
856
857 let mut get_failures = HashMap::new();
858 get_failures.insert(
859 2,
860 GetObjectFailureMode::OperationError(ObjectClientError::ClientError(MockClientError(
861 err_value.to_owned().into(),
862 ))),
863 );
864
865 fail_sequential_read_test(prefetcher_type, 1024 * 1024 + 111, 1024 * 1024, config, get_failures);
866 }
867
868 proptest! {
869 #[test]
870 fn proptest_sequential_read(
871 size in 1u64..1 * 1024 * 1024,
872 read_size in 1usize..1 * 1024 * 1024,
873 config: TestConfig,
874 ) {
875 run_sequential_read_test(PrefetcherType::Default, size, read_size, config);
876 }
877
878 #[test]
879 fn proptest_sequential_read_small_read_size(size in 1u64..1 * 1024 * 1024, read_factor in 1usize..10, config: TestConfig) {
880 let read_size = (size as usize / read_factor).max(1);
882 run_sequential_read_test(PrefetcherType::Default, size, read_size, config);
883 }
884
885 #[test]
886 fn proptest_sequential_read_with_cache(
887 size in 1u64..1 * 1024 * 1024,
888 read_size in 1usize..1 * 1024 * 1024,
889 config: TestConfig,
890 ) {
891 run_sequential_read_test(PrefetcherType::InMemoryCache(config.cache_block_size), size, read_size, config);
892 }
893
894 #[test]
895 fn proptest_sequential_read_small_read_size_with_cache(size in 1u64..1 * 1024 * 1024, read_factor in 1usize..10,
896 config: TestConfig) {
897 let read_size = (size as usize / read_factor).max(1);
899 run_sequential_read_test(PrefetcherType::InMemoryCache(config.cache_block_size), size, read_size, config);
900 }
901 }
902
903 #[test]
904 fn test_sequential_read_regression() {
905 let object_size = 854966;
906 let read_size = 161647;
907 let config = TestConfig {
908 initial_request_size: 484941,
909 max_read_window_size: 81509,
910 sequential_prefetch_multiplier: 1,
911 client_part_size: 181682,
912 max_forward_seek_wait_distance: 1,
913 max_backward_seek_distance: 18668,
914 cache_block_size: 1 * MB,
915 };
916 run_sequential_read_test(PrefetcherType::Default, object_size, read_size, config);
917 }
918
919 fn run_random_read_test(
920 prefetcher_type: PrefetcherType,
921 object_size: u64,
922 reads: Vec<(u64, usize)>,
923 test_config: TestConfig,
924 ) {
925 let client = Arc::new(
926 MockClient::config()
927 .bucket("test-bucket")
928 .part_size(test_config.client_part_size)
929 .enable_backpressure(true)
930 .initial_read_window_size(test_config.client_part_size)
931 .build(),
932 );
933 let object = MockObject::ramp(0xaa, object_size as usize, ETag::for_tests());
934 let etag = object.etag();
935
936 client.add_object("hello", object);
937
938 let prefetcher_config = PrefetcherConfig {
939 max_read_window_size: test_config.max_read_window_size,
940 sequential_prefetch_multiplier: test_config.sequential_prefetch_multiplier,
941 max_forward_seek_wait_distance: test_config.max_forward_seek_wait_distance,
942 max_backward_seek_distance: test_config.max_backward_seek_distance,
943 initial_request_size: test_config.initial_request_size,
944 };
945
946 let prefetcher = build_prefetcher(client, prefetcher_type, prefetcher_config);
947 let object_id = ObjectId::new("hello".to_owned(), etag);
948 let mut request = prefetcher.prefetch("test-bucket".to_owned(), object_id, object_size);
949
950 for (offset, length) in reads {
951 assert!(offset < object_size);
952 assert!(offset + length as u64 <= object_size);
953 let expected = ramp_bytes((0xaa + offset) as usize, length);
954 let buf = block_on(request.read(offset, length)).unwrap();
955 let buf = buf.into_bytes().unwrap();
956 assert_eq!(buf.len(), expected.len());
957 if buf[..] != expected[..] {
959 for i in 0..buf.len() {
960 if buf[i] != expected[i] {
961 panic!(
962 "buffer mismatch at offset {}, saw {} expected {}",
963 i, buf[i], expected[i]
964 );
965 }
966 }
967 }
968 }
969 }
970
971 fn random_read_strategy(max_object_size: u64) -> impl Strategy<Value = (u64, Vec<(u64, usize)>)> {
972 (1..=max_object_size).prop_flat_map(|object_size| {
973 (
974 Just(object_size),
975 proptest::collection::vec(
976 (0..object_size).prop_flat_map(move |offset| {
977 (1..=object_size - offset).prop_map(move |length| (offset, length as usize))
978 }),
979 0..10,
980 ),
981 )
982 })
983 }
984
985 proptest! {
986 #[test]
987 fn proptest_random_read(
988 reads in random_read_strategy(1 * 1024 * 1024),
989 config: TestConfig,
990 ) {
991 let (object_size, reads) = reads;
992 run_random_read_test(PrefetcherType::Default, object_size, reads, config);
993 }
994
995 #[test]
996 fn proptest_random_read_with_cache(
997 reads in random_read_strategy(1 * 1024 * 1024),
998 config: TestConfig,
999 ) {
1000 let (object_size, reads) = reads;
1001 run_random_read_test(PrefetcherType::InMemoryCache(config.cache_block_size), object_size, reads, config);
1002 }
1003 }
1004
1005 #[test]
1006 fn test_random_read_regression() {
1007 let object_size = 724314;
1008 let reads = vec![(0, 516883)];
1009 let config = TestConfig {
1010 initial_request_size: 3684779,
1011 max_read_window_size: 2147621,
1012 sequential_prefetch_multiplier: 4,
1013 client_part_size: 516882,
1014 max_forward_seek_wait_distance: 16 * 1024 * 1024,
1015 max_backward_seek_distance: 2 * 1024 * 1024,
1016 cache_block_size: 1 * MB,
1017 };
1018 run_random_read_test(PrefetcherType::Default, object_size, reads, config);
1019 }
1020
1021 #[test]
1022 fn test_random_read_regression2() {
1023 let object_size = 755678;
1024 let reads = vec![(0, 278499), (311250, 1)];
1025 let config = TestConfig {
1026 initial_request_size: 556997,
1027 max_read_window_size: 105938,
1028 sequential_prefetch_multiplier: 7,
1029 client_part_size: 1219731,
1030 max_forward_seek_wait_distance: 16 * 1024 * 1024,
1031 max_backward_seek_distance: 2 * 1024 * 1024,
1032 cache_block_size: 1 * MB,
1033 };
1034 run_random_read_test(PrefetcherType::Default, object_size, reads, config);
1035 }
1036
1037 #[test]
1038 fn test_random_read_regression3() {
1039 let object_size = 755678;
1040 let reads = vec![(0, 236766), (291204, 1), (280930, 36002)];
1041 let config = TestConfig {
1042 initial_request_size: 556997,
1043 max_read_window_size: 105938,
1044 sequential_prefetch_multiplier: 7,
1045 client_part_size: 1219731,
1046 max_forward_seek_wait_distance: 2260662,
1047 max_backward_seek_distance: 2369799,
1048 cache_block_size: 1 * MB,
1049 };
1050 run_random_read_test(PrefetcherType::Default, object_size, reads, config);
1051 }
1052
1053 #[test]
1054 fn test_random_read_regression4() {
1055 let object_size = 14201;
1056 let reads = vec![(3584, 1), (9424, 1460), (3582, 3340), (248, 9218)];
1057 let config = TestConfig {
1058 initial_request_size: 457999,
1059 max_read_window_size: 863511,
1060 sequential_prefetch_multiplier: 5,
1061 client_part_size: 1972409,
1062 max_forward_seek_wait_distance: 2810651,
1063 max_backward_seek_distance: 3531090,
1064 cache_block_size: 1 * MB,
1065 };
1066 run_random_read_test(PrefetcherType::Default, object_size, reads, config);
1067 }
1068
1069 #[test]
1070 fn test_forward_seek_failure() {
1071 const PART_SIZE: usize = 8192;
1072 const OBJECT_SIZE: usize = 2 * PART_SIZE;
1073
1074 let client = MockClient::config()
1075 .bucket("test-bucket")
1076 .part_size(PART_SIZE)
1077 .enable_backpressure(true)
1078 .initial_read_window_size(OBJECT_SIZE)
1079 .build();
1080 let object = MockObject::ramp(0xaa, OBJECT_SIZE, ETag::for_tests());
1081 let etag = object.etag();
1082 client.add_object("hello", object);
1083
1084 let mut get_failures = HashMap::new();
1085 get_failures.insert(
1086 1,
1087 GetObjectFailureMode::StreamPositionError(
1088 2,
1089 ObjectClientError::ClientError(MockClientError(
1090 "error in the second chunk of the first request".into(),
1091 )),
1092 ),
1093 );
1094 get_failures.insert(
1095 2,
1096 GetObjectFailureMode::OperationError(ObjectClientError::ClientError(MockClientError(
1097 "error in second request".into(),
1098 ))),
1099 );
1100
1101 let client = Arc::new(countdown_failure_client(
1102 client,
1103 CountdownFailureConfig {
1104 get_failures,
1105 ..Default::default()
1106 },
1107 ));
1108 let prefetcher = build_prefetcher(client, PrefetcherType::Default, Default::default());
1109 block_on(async {
1110 let object_id = ObjectId::new("hello".to_owned(), etag.clone());
1111 let mut request = prefetcher.prefetch("test-bucket".to_owned(), object_id, OBJECT_SIZE as u64);
1112
1113 _ = request.read(0, 1).await.expect("first read should succeed");
1115
1116 let offset = PART_SIZE + 1;
1118 _ = request.read(offset as u64, 1).await.expect_err("seek should fail");
1119
1120 _ = request
1122 .read(offset as u64, 1)
1123 .await
1124 .expect_err("first retry after failure should fail");
1125
1126 let byte = request
1128 .read(offset as u64, 1)
1129 .await
1130 .expect("second retry should succeed");
1131 let expected = ramp_bytes(0xaa + offset, 1);
1132 assert_eq!(byte.into_bytes().unwrap()[..], expected[..]);
1133 });
1134 }
1135
1136 #[test_case(PrefetcherType::Default)]
1137 #[test_case(PrefetcherType::InMemoryCache(8192))]
1138 fn test_short_read_failure(prefetcher_type: PrefetcherType) {
1139 const PART_SIZE: usize = 8192;
1140 const OBJECT_SIZE: usize = 2 * PART_SIZE;
1141
1142 let client = MockClient::config()
1143 .bucket("test-bucket")
1144 .part_size(PART_SIZE)
1145 .enable_backpressure(true)
1146 .initial_read_window_size(PART_SIZE)
1147 .build();
1148 let object = MockObject::ramp(0xaa, OBJECT_SIZE, ETag::for_tests());
1149 let etag = object.etag();
1150 client.add_object("hello", object);
1151
1152 let mut get_failures = HashMap::new();
1153 get_failures.insert(1, GetObjectFailureMode::StreamShortCircuit(1));
1155 get_failures.insert(3, GetObjectFailureMode::StreamShortCircuit(1));
1158
1159 let client = Arc::new(countdown_failure_client(
1160 client,
1161 CountdownFailureConfig {
1162 get_failures,
1163 ..Default::default()
1164 },
1165 ));
1166 let prefetcher_config = PrefetcherConfig {
1167 initial_request_size: PART_SIZE,
1168 ..Default::default()
1169 };
1170 let prefetcher = build_prefetcher(client, prefetcher_type, prefetcher_config);
1171
1172 block_on(async {
1173 let object_id = ObjectId::new("hello".to_owned(), etag.clone());
1174 let mut request = prefetcher.prefetch("test-bucket".to_owned(), object_id, OBJECT_SIZE as u64);
1175
1176 assert!(matches!(
1178 request.read(0, 10).await.expect_err("read should fail"),
1179 PrefetchReadError::GetRequestTerminatedUnexpectedly,
1180 ));
1181
1182 let bytes = request.read(0, PART_SIZE).await.unwrap();
1184 let expected = ramp_bytes(0xaa, PART_SIZE);
1185 assert_eq!(bytes.into_bytes().unwrap()[..], expected[..]);
1186 _ = request
1187 .read(PART_SIZE as u64, PART_SIZE)
1188 .await
1189 .expect_err("read should fail");
1190
1191 let bytes = request.read(0, OBJECT_SIZE).await.unwrap();
1193 let expected = ramp_bytes(0xaa, OBJECT_SIZE);
1194 assert_eq!(bytes.into_bytes().unwrap()[..], expected[..]);
1195
1196 let bytes = request.read(PART_SIZE as u64, OBJECT_SIZE).await.unwrap();
1198 let expected = ramp_bytes(0xaa + PART_SIZE, PART_SIZE);
1199 assert_eq!(bytes.into_bytes().unwrap()[..], expected[..]);
1200 });
1201 }
1202
1203 #[test_case(0, 25; "no first read")]
1204 #[test_case(60, 25; "read beyond first part")]
1205 #[test_case(20, 25; "read in first part")]
1206 #[test_case(125, 110; "read in second request")]
1207 fn test_forward_seek(first_read_size: usize, part_size: usize) {
1208 const OBJECT_SIZE: usize = 200;
1209 const FIRST_REQUEST_SIZE: usize = 100;
1210
1211 let client = Arc::new(
1212 MockClient::config()
1213 .bucket("test-bucket")
1214 .part_size(part_size)
1215 .enable_backpressure(true)
1216 .initial_read_window_size(FIRST_REQUEST_SIZE)
1217 .build(),
1218 );
1219 let object = MockObject::ramp(0xaa, OBJECT_SIZE, ETag::for_tests());
1220 let etag = object.etag();
1221
1222 client.add_object("hello", object);
1223
1224 let prefetcher = build_prefetcher(client, PrefetcherType::Default, Default::default());
1225
1226 for offset in first_read_size + 1..OBJECT_SIZE {
1228 let object_id = ObjectId::new("hello".to_owned(), etag.clone());
1229 let mut request = prefetcher.prefetch("test-bucket".to_owned(), object_id, OBJECT_SIZE as u64);
1230 if first_read_size > 0 {
1231 let _first_read = block_on(request.read(0, first_read_size)).unwrap();
1232 }
1233
1234 let byte = block_on(request.read(offset as u64, 1)).unwrap();
1235 let expected = ramp_bytes(0xaa + offset, 1);
1236 assert_eq!(byte.into_bytes().unwrap()[..], expected[..]);
1237 }
1238 }
1239
1240 #[test_case(60, 25; "read beyond first part")]
1241 #[test_case(20, 25; "read in first part")]
1242 #[test_case(125, 110; "read in second request")]
1243 fn test_backward_seek(first_read_size: usize, part_size: usize) {
1244 const OBJECT_SIZE: usize = 200;
1245
1246 let client = Arc::new(
1247 MockClient::config()
1248 .bucket("test-bucket")
1249 .part_size(part_size)
1250 .enable_backpressure(true)
1251 .initial_read_window_size(part_size)
1252 .build(),
1253 );
1254 let object = MockObject::ramp(0xaa, OBJECT_SIZE, ETag::for_tests());
1255 let etag = object.etag();
1256
1257 client.add_object("hello", object);
1258
1259 let prefetcher = build_prefetcher(client, PrefetcherType::Default, Default::default());
1260
1261 for offset in 0..first_read_size {
1263 let object_id = ObjectId::new("hello".to_owned(), etag.clone());
1264 let mut request = prefetcher.prefetch("test-bucket".to_owned(), object_id, OBJECT_SIZE as u64);
1265 if first_read_size > 0 {
1266 let _first_read = block_on(request.read(0, first_read_size)).unwrap();
1267 }
1268
1269 let byte = block_on(request.read(offset as u64, 1)).unwrap();
1270 let expected = ramp_bytes(0xaa + offset, 1);
1271 assert_eq!(byte.into_bytes().unwrap()[..], expected[..]);
1272 }
1273 }
1274
1275 #[test_case(8 * 1024 * 1024, 1 * 1024 * 1024, 8 * 1024 * 1024; "8MiB part_size, 1MiB window")]
1276 #[test_case(1 * 1024 * 1024, 1 * 1024 * 1024, 1 * 1024 * 1024; "equal part_size and window")]
1277 #[test_case(250 * 1024, 1 * 1024 * 1024, 1250 * 1024; "window larger than part_size")]
1278 fn test_seek_window_reservation(part_size: usize, seek_window_size: usize, expected: u64) {
1279 let reservation = PrefetchGetObject::<MockClient>::seek_window_reservation(part_size, seek_window_size);
1280 assert_eq!(reservation, expected);
1281 }
1282
1283 #[test_case(8 * MB, 8 * MB, 1 * MB + 128 * KB; "default")]
1284 #[test_case(8 * MB, 8 * MB, 0; "no initial request")]
1285 #[test_case(1 * KB, 1 * MB, 10 * MB; "initial request larger than part size")]
1286 #[test_case(16 * MB, 8 * MB, 1 * MB + 128 * KB; "larger intial read window")]
1287 #[test_case(16 * MB, 8 * MB, 0; "larger intial read window w/o initial request")]
1288 #[test_case(1 * KB, 8 * MB, 1 * MB + 128 * KB; "smaller intial read window")]
1289 #[test_case(1 * KB, 8 * MB, 0; "smaller intial read window w/o initial request")]
1290 fn test_initial_reqeust_size(initial_read_window_size: usize, part_size: usize, initial_request_size: usize) {
1291 let object_size = (16 * MB) as u64;
1292
1293 let client = Arc::new(
1294 MockClient::config()
1295 .bucket("test-bucket")
1296 .part_size(part_size)
1297 .enable_backpressure(true)
1298 .initial_read_window_size(initial_read_window_size)
1299 .build(),
1300 );
1301
1302 let object = MockObject::ramp(0xaa, object_size as usize, ETag::for_tests());
1303 let etag = object.etag();
1304 client.add_object("test-object", object);
1305
1306 let prefetcher_config = PrefetcherConfig {
1307 initial_request_size,
1308 ..Default::default()
1309 };
1310
1311 let prefetcher = build_prefetcher(client.clone(), PrefetcherType::Default, prefetcher_config);
1312 let object_id = ObjectId::new("test-object".to_owned(), etag);
1313 let mut request = prefetcher.prefetch("test-bucket".to_owned(), object_id, object_size);
1314
1315 let mut next_offset = 0;
1317 while next_offset < object_size {
1318 let buf = block_on(request.read(next_offset, 256 * KB)).unwrap();
1319 if buf.is_empty() {
1320 break;
1321 }
1322 let buf = buf.into_bytes().unwrap();
1323 let expected = ramp_bytes((0xaa + next_offset) as usize, buf.len());
1324 assert_eq!(&buf[..], &expected[..]);
1325 next_offset += buf.len() as u64;
1326 }
1327 assert_eq!(next_offset, object_size);
1328 }
1329
1330 #[cfg(feature = "shuttle")]
1331 mod shuttle_tests {
1332 use super::*;
1333 use futures::task::{FutureObj, Spawn, SpawnError};
1334 use shuttle::future::block_on;
1335 use shuttle::rand::Rng;
1336 use shuttle::{check_pct, check_random};
1337
1338 struct ShuttleRuntime;
1339 impl Spawn for ShuttleRuntime {
1340 fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> {
1341 shuttle::future::spawn(future);
1342 Ok(())
1343 }
1344 }
1345
1346 fn sequential_read_stress_helper() {
1347 let mut rng = shuttle::rand::thread_rng();
1348 let object_size = rng.gen_range(1u64..1 * 1024 * 1024);
1349 let max_read_window_size = rng.gen_range(16usize..1 * 1024 * 1024);
1350 let sequential_prefetch_multiplier = rng.gen_range(2usize..16);
1351 let part_size = rng.gen_range(16usize..INITIAL_REQUEST_SIZE);
1352 let initial_request_size = rng.gen_range(0..INITIAL_REQUEST_SIZE);
1353 let max_forward_seek_wait_distance = rng.gen_range(16u64..1 * 1024 * 1024 + 256 * 1024);
1354 let max_backward_seek_distance = rng.gen_range(16u64..1 * 1024 * 1024 + 256 * 1024);
1355
1356 let client = Arc::new(
1357 MockClient::config()
1358 .bucket("test-bucket")
1359 .part_size(part_size)
1360 .enable_backpressure(true)
1361 .initial_read_window_size(part_size)
1362 .build(),
1363 );
1364 let pool = PagedPool::new_with_candidate_sizes([part_size]);
1365 let mem_limiter = Arc::new(MemoryLimiter::new(pool, MINIMUM_MEM_LIMIT));
1366 let object = MockObject::ramp(0xaa, object_size as usize, ETag::for_tests());
1367 let file_etag = object.etag();
1368
1369 client.add_object("hello", object);
1370
1371 let prefetcher_config = PrefetcherConfig {
1372 max_read_window_size,
1373 sequential_prefetch_multiplier,
1374 max_forward_seek_wait_distance,
1375 max_backward_seek_distance,
1376 initial_request_size,
1377 };
1378
1379 let prefetcher =
1380 Prefetcher::default_builder(client).build(Runtime::new(ShuttleRuntime), mem_limiter, prefetcher_config);
1381 let object_id = ObjectId::new("hello".to_owned(), file_etag);
1382 let mut request = prefetcher.prefetch("test-bucket".to_owned(), object_id, object_size);
1383
1384 let mut next_offset = 0;
1385 loop {
1386 let read_size = rng.gen_range(1usize..1 * 1024 * 1024);
1387 let buf = block_on(request.read(next_offset, read_size)).unwrap();
1388 if buf.is_empty() {
1389 break;
1390 }
1391 let buf = buf.into_bytes().unwrap();
1392 let expected = ramp_bytes((0xaa + next_offset) as usize, buf.len());
1393 assert_eq!(&buf[..], &expected[..buf.len()]);
1394 next_offset += buf.len() as u64;
1395 }
1396 assert_eq!(next_offset, object_size);
1397 }
1398
1399 #[test]
1400 fn sequential_read_stress() {
1401 check_random(sequential_read_stress_helper, 1000);
1402 check_pct(sequential_read_stress_helper, 1000, 3);
1403 }
1404
1405 fn random_read_stress_helper() {
1406 let mut rng = shuttle::rand::thread_rng();
1407 let max_read_window_size = rng.gen_range(16usize..32 * 1024);
1408 let sequential_prefetch_multiplier = rng.gen_range(2usize..16);
1409 let part_size = rng.gen_range(16usize..128 * 1024);
1410 let initial_request_size = rng.gen_range(16usize..128 * 1024);
1411 let max_forward_seek_wait_distance = rng.gen_range(16u64..192 * 1024);
1412 let max_backward_seek_distance = rng.gen_range(16u64..192 * 1024);
1413 let max_object_size = initial_request_size.min(max_read_window_size) * 20;
1416 let object_size = rng.gen_range(1u64..(64 * 1024).min(max_object_size) as u64);
1417
1418 let client = Arc::new(
1419 MockClient::config()
1420 .bucket("test-bucket")
1421 .part_size(part_size)
1422 .enable_backpressure(true)
1423 .initial_read_window_size(part_size)
1424 .build(),
1425 );
1426 let pool = PagedPool::new_with_candidate_sizes([part_size]);
1427 let mem_limiter = Arc::new(MemoryLimiter::new(pool, MINIMUM_MEM_LIMIT));
1428 let object = MockObject::ramp(0xaa, object_size as usize, ETag::for_tests());
1429 let file_etag = object.etag();
1430
1431 client.add_object("hello", object);
1432
1433 let prefetcher_config = PrefetcherConfig {
1434 max_read_window_size,
1435 sequential_prefetch_multiplier,
1436 max_forward_seek_wait_distance,
1437 max_backward_seek_distance,
1438 initial_request_size,
1439 };
1440
1441 let prefetcher =
1442 Prefetcher::default_builder(client).build(Runtime::new(ShuttleRuntime), mem_limiter, prefetcher_config);
1443 let object_id = ObjectId::new("hello".to_owned(), file_etag);
1444 let mut request = prefetcher.prefetch("test-bucket".to_owned(), object_id, object_size);
1445
1446 let num_reads = rng.gen_range(10usize..50);
1447 for _ in 0..num_reads {
1448 let offset = rng.gen_range(0u64..object_size);
1449 let length = rng.gen_range(1usize..(object_size - offset + 1) as usize);
1450 let expected = ramp_bytes((0xaa + offset) as usize, length);
1451 let buf = block_on(request.read(offset, length)).unwrap();
1452 let buf = buf.into_bytes().unwrap();
1453 assert_eq!(buf.len(), expected.len());
1454 if buf[..] != expected[..] {
1456 for i in 0..buf.len() {
1457 if buf[i] != expected[i] {
1458 panic!(
1459 "buffer mismatch at offset {}, saw {} expected {}",
1460 i, buf[i], expected[i]
1461 );
1462 }
1463 }
1464 }
1465 }
1466 }
1467
1468 #[test]
1469 fn random_read_stress() {
1470 check_random(random_read_stress_helper, 1000);
1471 check_pct(random_read_stress_helper, 1000, 3);
1472 }
1473 }
1474}