1use std::fmt::Debug;
35
36use metrics::{counter, histogram};
37use mountpoint_s3_client::error::{GetObjectError, ObjectClientError};
38use mountpoint_s3_client::{ObjectClient, error_metadata::ProvideErrorMetadata};
39use thiserror::Error;
40use tracing::trace;
41
42use crate::checksums::{ChecksummedBytes, IntegrityError};
43use crate::data_cache::DataCache;
44use crate::fs::error_metadata::{ErrorMetadata, MOUNTPOINT_ERROR_CLIENT};
45use crate::mem_limiter::{BufferArea, MemoryLimiter};
46use crate::metrics::defs::{FUSE_CACHE_HIT, PREFETCH_RESET_STATE};
47use crate::object::ObjectId;
48use crate::sync::Arc;
49
50mod backpressure_controller;
51mod builder;
52mod caching_stream;
53mod part;
54mod part_queue;
55mod part_stream;
56mod seek_window;
57mod task;
58
59pub use builder::PrefetcherBuilder;
60use part::PartOperationError;
61use part_stream::{PartStream, RequestRange, RequestTaskConfig};
62use seek_window::SeekWindow;
63use task::RequestTask;
64
65#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
67pub struct HandleId(u64);
68
69impl HandleId {
70 pub fn new(id: u64) -> Self {
71 Self(id)
72 }
73
74 pub fn as_raw(&self) -> u64 {
75 self.0
76 }
77}
78
79pub const INITIAL_REQUEST_SIZE: usize = 1024 * 1024 + 128 * 1024;
88
89#[derive(Debug, Error)]
90pub enum PrefetchReadError<E> {
91 #[error("get object request failed")]
92 GetRequestFailed {
93 source: ObjectClientError<GetObjectError, E>,
94 metadata: Box<ErrorMetadata>,
95 },
96
97 #[error("get object request returned wrong offset")]
98 GetRequestReturnedWrongOffset { offset: u64, expected_offset: u64 },
99
100 #[error("get request terminated unexpectedly")]
101 GetRequestTerminatedUnexpectedly,
102
103 #[error("integrity check failed")]
104 Integrity(#[from] IntegrityError),
105
106 #[error("part read failed")]
107 PartReadFailed(#[from] PartOperationError),
108
109 #[error("backpressure must be enabled with non-zero initial read window")]
110 BackpressurePreconditionFailed,
111
112 #[error("read window increment failed")]
113 ReadWindowIncrement,
114}
115
116impl<E: ProvideErrorMetadata + std::error::Error + Send + Sync + 'static> PrefetchReadError<E> {
117 fn get_request_failed(err: ObjectClientError<GetObjectError, E>, bucket: &str, key: &str) -> Self {
118 let metadata = ErrorMetadata {
119 client_error_meta: err.meta(),
120 error_code: Some(MOUNTPOINT_ERROR_CLIENT.to_string()),
121 s3_bucket_name: Some(bucket.to_string()),
122 s3_object_key: Some(key.to_string()),
123 };
124 let metadata = Box::new(metadata);
125 Self::GetRequestFailed { source: err, metadata }
126 }
127}
128
129#[derive(Debug, Clone, Copy)]
130pub struct PrefetcherConfig {
131 pub max_read_window_size: usize,
133 pub sequential_prefetch_multiplier: usize,
135 pub max_forward_seek_wait_distance: u64,
138 pub max_backward_seek_distance: u64,
141 pub initial_request_size: usize,
145}
146
147impl Default for PrefetcherConfig {
148 #[allow(clippy::identity_op)]
149 fn default() -> Self {
150 Self {
151 max_read_window_size: determine_max_read_size(),
152 sequential_prefetch_multiplier: 2,
153 max_forward_seek_wait_distance: 16 * 1024 * 1024,
158 max_backward_seek_distance: 1 * 1024 * 1024,
159 initial_request_size: INITIAL_REQUEST_SIZE,
160 }
161 }
162}
163
164fn determine_max_read_size() -> usize {
176 const ENV_VAR_KEY: &str = "UNSTABLE_MOUNTPOINT_MAX_PREFETCH_WINDOW_SIZE";
177 const DEFAULT_READ_WINDOW_SIZE: usize = 2 * 1024 * 1024 * 1024;
178
179 match std::env::var_os(ENV_VAR_KEY) {
180 Some(val) => match val.to_string_lossy().parse() {
181 Ok(val) => {
182 tracing::warn!(
183 "successfully overridden prefetch read window size \
184 with new value {val} bytes from unstable environment config",
185 );
186 val
187 }
188 Err(_) => {
189 tracing::warn!(
190 "{ENV_VAR_KEY} did not contain a valid positive integer \
191 for prefetch bytes, using {DEFAULT_READ_WINDOW_SIZE} bytes instead",
192 );
193 DEFAULT_READ_WINDOW_SIZE
194 }
195 },
196 None => DEFAULT_READ_WINDOW_SIZE,
197 }
198}
199
200#[derive(Debug)]
202pub struct Prefetcher<Client> {
203 part_stream: PartStream<Client>,
204 config: PrefetcherConfig,
205 mem_limiter: Arc<MemoryLimiter>,
206}
207
208impl<Client> Prefetcher<Client>
209where
210 Client: ObjectClient + Clone + Send + Sync + 'static,
211{
212 pub fn default_builder(client: Client) -> PrefetcherBuilder<Client> {
214 PrefetcherBuilder::default_builder(client)
215 }
216
217 pub fn caching_builder<Cache>(cache: Cache, client: Client) -> PrefetcherBuilder<Client>
219 where
220 Cache: DataCache + Send + Sync + 'static,
221 {
222 PrefetcherBuilder::caching_builder(cache, client)
223 }
224
225 pub fn new(part_stream: PartStream<Client>, config: PrefetcherConfig, mem_limiter: Arc<MemoryLimiter>) -> Self {
227 Self {
228 part_stream,
229 config,
230 mem_limiter,
231 }
232 }
233
234 pub fn prefetch(
236 &self,
237 bucket: String,
238 object_id: ObjectId,
239 handle_id: HandleId,
240 size: u64,
241 ) -> PrefetchGetObject<Client>
242 where
243 Client: ObjectClient + Clone + Send + Sync + 'static,
244 {
245 PrefetchGetObject::new(
246 self.part_stream.clone(),
247 self.config,
248 bucket,
249 object_id,
250 handle_id,
251 size,
252 self.mem_limiter.clone(),
253 )
254 }
255}
256
257#[derive(Debug)]
259pub struct PrefetchGetObject<Client>
260where
261 Client: ObjectClient + Clone + Send + Sync + 'static,
262{
263 part_stream: PartStream<Client>,
264 config: PrefetcherConfig,
265 backpressure_task: Option<RequestTask<Client>>,
266 backward_seek_window: SeekWindow,
269 bucket: String,
270 object_id: ObjectId,
271 preferred_part_size: usize,
273 sequential_read_start_offset: u64,
275 next_sequential_read_offset: u64,
276 next_request_offset: u64,
277 size: u64,
278 mem_limiter: Arc<MemoryLimiter>,
279 handle_id: HandleId,
281}
282
283impl<Client> PrefetchGetObject<Client>
284where
285 Client: ObjectClient + Clone + Send + Sync + 'static,
286{
287 fn new(
289 part_stream: PartStream<Client>,
290 config: PrefetcherConfig,
291 bucket: String,
292 object_id: ObjectId,
293 handle_id: HandleId,
294 size: u64,
295 mem_limiter: Arc<MemoryLimiter>,
296 ) -> Self {
297 let max_backward_seek_distance = config.max_backward_seek_distance as usize;
298 let seek_window_reservation =
301 Self::seek_window_reservation(part_stream.client().read_part_size(), max_backward_seek_distance);
302 mem_limiter.reserve(BufferArea::Prefetch, seek_window_reservation);
303 PrefetchGetObject {
304 part_stream,
305 config,
306 backpressure_task: None,
307 backward_seek_window: SeekWindow::new(max_backward_seek_distance),
308 preferred_part_size: 128 * 1024,
309 sequential_read_start_offset: 0,
310 next_sequential_read_offset: 0,
311 next_request_offset: 0,
312 bucket,
313 object_id,
314 size,
315 mem_limiter,
316 handle_id,
317 }
318 }
319
320 pub async fn read(
324 &mut self,
325 offset: u64,
326 length: usize,
327 ) -> Result<ChecksummedBytes, PrefetchReadError<Client::ClientError>> {
328 trace!(
329 offset,
330 length,
331 next_seq_offset = self.next_sequential_read_offset,
332 "read"
333 );
334
335 match self.try_read(offset, length).await {
336 Ok((data, cache_hit)) => {
337 if !data.is_empty() && cache_hit {
344 metrics::counter!(FUSE_CACHE_HIT).increment(1);
348 }
349 Ok(data)
350 }
351 Err(err) => {
352 self.reset_prefetch_to_offset(offset);
353 Err(err)
354 }
355 }
356 }
357
358 async fn try_read(
359 &mut self,
360 offset: u64,
361 length: usize,
362 ) -> Result<(ChecksummedBytes, bool), PrefetchReadError<Client::ClientError>> {
363 let max_preferred_part_size = 1024 * 1024;
371 self.preferred_part_size = self.preferred_part_size.max(length).min(max_preferred_part_size);
372
373 let remaining = self.size.saturating_sub(offset);
374 if remaining == 0 {
375 return Ok((ChecksummedBytes::default(), false));
376 }
377 let mut to_read = (length as u64).min(remaining);
378
379 if self.next_sequential_read_offset != offset {
382 if self.try_seek(offset).await? {
383 trace!("seek succeeded");
384 } else {
385 trace!(
386 expected = self.next_sequential_read_offset,
387 actual = offset,
388 "out-of-order read, resetting prefetch"
389 );
390 counter!(PREFETCH_RESET_STATE).increment(1);
391
392 self.record_contiguous_read_metric();
394
395 self.reset_prefetch_to_offset(offset);
396 }
397 }
398 assert_eq!(self.next_sequential_read_offset, offset);
399
400 if self.backpressure_task.is_none() {
401 self.backpressure_task = Some(self.spawn_read_backpressure_request()?);
402 }
403
404 let mut all_parts_from_cache = true;
405 let mut response = ChecksummedBytes::default();
406 while to_read > 0 {
407 let Some(current_task) = self.backpressure_task.as_mut() else {
408 trace!(offset, length, "read beyond object size");
409 break;
410 };
411 debug_assert!(current_task.remaining() > 0);
412
413 let part = current_task.read(to_read as usize).await?;
414 all_parts_from_cache &= part.is_from_cache();
415 self.backward_seek_window.push(part.clone());
416 let part_bytes = part.into_bytes(&self.object_id, self.next_sequential_read_offset)?;
417
418 self.next_sequential_read_offset += part_bytes.len() as u64;
419 if response.is_empty() && part_bytes.len() == to_read as usize {
423 return Ok((part_bytes, all_parts_from_cache));
424 }
425
426 let part_len = part_bytes.len() as u64;
427 response.extend(part_bytes)?;
428 to_read -= part_len;
429 }
430
431 Ok((response, all_parts_from_cache))
432 }
433
434 fn spawn_read_backpressure_request(
437 &mut self,
438 ) -> Result<RequestTask<Client>, PrefetchReadError<Client::ClientError>> {
439 let start = self.next_sequential_read_offset;
440 let object_size = self.size as usize;
441 let read_part_size = self.part_stream.client().read_part_size();
442 let range = RequestRange::new(object_size, start, object_size);
443
444 match self.part_stream.client().initial_read_window_size() {
446 Some(value) => {
447 if value == 0 {
449 return Err(PrefetchReadError::BackpressurePreconditionFailed);
450 }
451 }
452 None => return Err(PrefetchReadError::BackpressurePreconditionFailed),
453 };
454
455 let config = RequestTaskConfig {
456 bucket: self.bucket.clone(),
457 object_id: self.object_id.clone(),
458 handle_id: self.handle_id,
459 range,
460 read_part_size,
461 preferred_part_size: self.preferred_part_size,
462 initial_request_size: self.config.initial_request_size,
463 max_read_window_size: self.config.max_read_window_size,
464 read_window_size_multiplier: self.config.sequential_prefetch_multiplier,
465 };
466 Ok(self.part_stream.spawn_get_object_request(config))
467 }
468
469 fn reset_prefetch_to_offset(&mut self, offset: u64) {
471 self.backpressure_task = None;
472 self.backward_seek_window.clear();
473 self.sequential_read_start_offset = offset;
474 self.next_sequential_read_offset = offset;
475 self.next_request_offset = offset;
476 }
477
478 async fn try_seek(&mut self, offset: u64) -> Result<bool, PrefetchReadError<Client::ClientError>> {
482 assert_ne!(offset, self.next_sequential_read_offset);
483 trace!(from = self.next_sequential_read_offset, to = offset, "trying to seek");
484 if offset > self.next_sequential_read_offset {
485 self.try_seek_forward(offset).await
486 } else {
487 self.try_seek_backward(offset).await
488 }
489 }
490
491 async fn try_seek_forward(&mut self, offset: u64) -> Result<bool, PrefetchReadError<Client::ClientError>> {
492 assert!(offset > self.next_sequential_read_offset);
493 let total_seek_distance = offset - self.next_sequential_read_offset;
494 histogram!("prefetch.seek_distance", "dir" => "forward").record(total_seek_distance as f64);
495
496 let Some(task) = self.backpressure_task.as_mut() else {
497 return Ok(false);
499 };
500
501 if offset >= task.read_window_end_offset() {
503 return Ok(false);
504 }
505
506 let available_offset = task.available_offset();
510 let available_soon_offset = available_offset.saturating_add(self.config.max_forward_seek_wait_distance);
511 if offset >= available_soon_offset {
512 trace!(
513 requested_offset = offset,
514 available_offset = available_offset,
515 "seek failed: not enough data available"
516 );
517 return Ok(false);
518 }
519 let mut seek_distance = offset - self.next_sequential_read_offset;
520 while seek_distance > 0 {
521 let part = task.read(seek_distance as usize).await?;
522 seek_distance -= part.len() as u64;
523 self.next_sequential_read_offset += part.len() as u64;
524 self.backward_seek_window.push(part);
525 }
526 Ok(true)
527 }
528
529 async fn try_seek_backward(&mut self, offset: u64) -> Result<bool, PrefetchReadError<Client::ClientError>> {
530 assert!(offset < self.next_sequential_read_offset);
531
532 let Some(task) = self.backpressure_task.as_mut() else {
535 return Ok(false);
536 };
537 let backwards_length_needed = self.next_sequential_read_offset - offset;
538 histogram!("prefetch.seek_distance", "dir" => "backward").record(backwards_length_needed as f64);
539
540 let Some(parts) = self.backward_seek_window.read_back(backwards_length_needed as usize) else {
541 trace!("seek failed: not enough data in backwards seek window");
542 return Ok(false);
543 };
544 task.push_front(parts).await?;
549 self.next_sequential_read_offset = offset;
550 Ok(true)
551 }
552
553 fn record_contiguous_read_metric(&self) {
557 histogram!("prefetch.contiguous_read_len")
558 .record((self.next_sequential_read_offset - self.sequential_read_start_offset) as f64);
559 }
560
561 fn seek_window_reservation(part_size: usize, seek_window_size: usize) -> u64 {
565 (seek_window_size.div_ceil(part_size) * part_size) as u64
566 }
567}
568
569impl<Client> Drop for PrefetchGetObject<Client>
570where
571 Client: ObjectClient + Clone + Send + Sync + 'static,
572{
573 fn drop(&mut self) {
574 let seek_window_reservation = Self::seek_window_reservation(
575 self.part_stream.client().read_part_size(),
576 self.backward_seek_window.max_size(),
577 );
578 self.mem_limiter.release(BufferArea::Prefetch, seek_window_reservation);
579 self.record_contiguous_read_metric();
580 }
581}
582
583#[cfg(test)]
584mod tests {
585 #![allow(clippy::identity_op)]
587
588 use crate::Runtime;
589 use crate::data_cache::InMemoryDataCache;
590 use crate::mem_limiter::{MINIMUM_MEM_LIMIT, MemoryLimiter};
591 use crate::memory::PagedPool;
592 use crate::sync::Arc;
593
594 use super::*;
595 use futures::executor::{ThreadPool, block_on};
596 use mountpoint_s3_client::failure_client::{
597 CountdownFailureConfig, GetObjectFailureMode, countdown_failure_client,
598 };
599 use mountpoint_s3_client::mock_client::{MockClient, MockClientConfig, MockClientError, MockObject, ramp_bytes};
600 use mountpoint_s3_client::types::ETag;
601 use proptest::proptest;
602 use proptest::strategy::{Just, Strategy};
603 use proptest_derive::Arbitrary;
604 use std::collections::HashMap;
605 use test_case::test_case;
606
607 const KB: usize = 1024;
608 const MB: usize = 1024 * 1024;
609
610 #[derive(Debug, Arbitrary)]
611 struct TestConfig {
612 #[proptest(strategy = "16usize..1*1024*1024")]
613 initial_request_size: usize,
614 #[proptest(strategy = "16usize..1*1024*1024")]
615 max_read_window_size: usize,
616 #[proptest(strategy = "1usize..8usize")]
617 sequential_prefetch_multiplier: usize,
618 #[proptest(strategy = "16usize..2*1024*1024")]
619 client_part_size: usize,
620 #[proptest(strategy = "1u64..4*1024*1024")]
621 max_forward_seek_wait_distance: u64,
622 #[proptest(strategy = "1u64..4*1024*1024")]
623 max_backward_seek_distance: u64,
624 #[proptest(strategy = "16usize..1*1024*1024")]
625 cache_block_size: usize,
626 }
627
628 enum PrefetcherType {
629 Default,
630 InMemoryCache(usize),
631 }
632
633 fn build_prefetcher<Client>(
634 client: Client,
635 prefetcher_type: PrefetcherType,
636 prefetcher_config: PrefetcherConfig,
637 ) -> Prefetcher<Client>
638 where
639 Client: ObjectClient + Clone + Send + Sync + 'static,
640 {
641 let pool = PagedPool::new_with_candidate_sizes([client.read_part_size(), client.write_part_size()]);
642 let mem_limiter = Arc::new(MemoryLimiter::new(pool, MINIMUM_MEM_LIMIT));
643 let runtime = Runtime::new(ThreadPool::builder().pool_size(1).create().unwrap());
644 let builder = match prefetcher_type {
645 PrefetcherType::Default => Prefetcher::default_builder(client),
646 PrefetcherType::InMemoryCache(block_size) => {
647 let cache = InMemoryDataCache::new(block_size as u64);
648 Prefetcher::caching_builder(cache, client)
649 }
650 };
651 builder.build(runtime, mem_limiter, prefetcher_config)
652 }
653
654 fn run_sequential_read_test(prefetcher_type: PrefetcherType, size: u64, read_size: usize, test_config: TestConfig) {
655 let client = Arc::new(
656 MockClient::config()
657 .bucket("test-bucket")
658 .part_size(test_config.client_part_size)
659 .enable_backpressure(true)
660 .initial_read_window_size(test_config.client_part_size)
661 .build(),
662 );
663 let object = MockObject::ramp(0xaa, size as usize, ETag::for_tests());
664 let etag = object.etag();
665
666 client.add_object("hello", object);
667
668 let prefetcher_config = PrefetcherConfig {
669 max_read_window_size: test_config.max_read_window_size,
670 sequential_prefetch_multiplier: test_config.sequential_prefetch_multiplier,
671 max_forward_seek_wait_distance: test_config.max_forward_seek_wait_distance,
672 max_backward_seek_distance: test_config.max_backward_seek_distance,
673 initial_request_size: test_config.initial_request_size,
674 };
675
676 let prefetcher = build_prefetcher(client.clone(), prefetcher_type, prefetcher_config);
677 let object_id = ObjectId::new("hello".to_owned(), etag);
678 let fh = HandleId::new(1);
679 let mut request = prefetcher.prefetch("test-bucket".to_owned(), object_id, fh, size);
680
681 let mut next_offset = 0;
682 loop {
683 let buf = block_on(request.read(next_offset, read_size)).unwrap();
684 if buf.is_empty() {
685 break;
686 }
687 let buf = buf.into_bytes().unwrap();
688 let expected = ramp_bytes((0xaa + next_offset) as usize, buf.len());
689 assert_eq!(&buf[..], &expected[..buf.len()]);
690 next_offset += buf.len() as u64;
691 }
692 assert_eq!(next_offset, size);
693 }
694
695 #[test_case(PrefetcherType::Default)]
696 #[test_case(PrefetcherType::InMemoryCache(1 * MB))]
697 fn sequential_read_small(prefetcher_type: PrefetcherType) {
698 let config = TestConfig {
699 initial_request_size: 256 * 1024,
700 max_read_window_size: 1024 * 1024 * 1024,
701 sequential_prefetch_multiplier: 8,
702 client_part_size: 8 * 1024 * 1024,
703 max_forward_seek_wait_distance: 16 * 1024 * 1024,
704 max_backward_seek_distance: 2 * 1024 * 1024,
705 cache_block_size: 1 * MB,
706 };
707 run_sequential_read_test(prefetcher_type, 1024 * 1024 + 111, 1024 * 1024, config);
708 }
709
710 #[test_case(PrefetcherType::Default)]
711 #[test_case(PrefetcherType::InMemoryCache(1 * MB))]
712 fn sequential_read_medium(prefetcher_type: PrefetcherType) {
713 let config = TestConfig {
714 initial_request_size: 256 * 1024,
715 max_read_window_size: 64 * 1024 * 1024,
716 sequential_prefetch_multiplier: 8,
717 client_part_size: 8 * 1024 * 1024,
718 max_forward_seek_wait_distance: 16 * 1024 * 1024,
719 max_backward_seek_distance: 2 * 1024 * 1024,
720 cache_block_size: 1 * MB,
721 };
722 run_sequential_read_test(prefetcher_type, 16 * 1024 * 1024 + 111, 1024 * 1024, config);
723 }
724
725 #[test_case(PrefetcherType::Default)]
726 #[test_case(PrefetcherType::InMemoryCache(1 * MB))]
727 fn sequential_read_large(prefetcher_type: PrefetcherType) {
728 let config = TestConfig {
729 initial_request_size: 256 * 1024,
730 max_read_window_size: 64 * 1024 * 1024,
731 sequential_prefetch_multiplier: 8,
732 client_part_size: 8 * 1024 * 1024,
733 max_forward_seek_wait_distance: 16 * 1024 * 1024,
734 max_backward_seek_distance: 2 * 1024 * 1024,
735 cache_block_size: 1 * MB,
736 };
737
738 run_sequential_read_test(prefetcher_type, 256 * 1024 * 1024 + 111, 1024 * 1024, config);
739 }
740
741 fn fail_with_backpressure_precondition_test(
742 prefetcher_type: PrefetcherType,
743 test_config: TestConfig,
744 client_config: MockClientConfig,
745 ) {
746 let client = Arc::new(MockClient::new(client_config));
747 let read_size = 1 * MB;
748 let object_size = 8 * MB;
749 let object = MockObject::ramp(0xaa, object_size, ETag::for_tests());
750 let etag = object.etag();
751
752 let prefetcher_config = PrefetcherConfig {
753 max_read_window_size: test_config.max_read_window_size,
754 sequential_prefetch_multiplier: test_config.sequential_prefetch_multiplier,
755 ..Default::default()
756 };
757
758 let prefetcher = build_prefetcher(client, prefetcher_type, prefetcher_config);
759 let object_id = ObjectId::new("hello".to_owned(), etag);
760 let fh = HandleId::new(1);
761 let mut request = prefetcher.prefetch("test-bucket".to_owned(), object_id, fh, object_size as u64);
762 let result = block_on(request.read(0, read_size));
763 assert!(matches!(result, Err(PrefetchReadError::BackpressurePreconditionFailed)));
764 }
765
766 #[test_case(PrefetcherType::Default)]
767 #[test_case(PrefetcherType::InMemoryCache(1 * MB))]
768 fn fail_with_backpressure_not_enabled(prefetcher_type: PrefetcherType) {
769 let test_config = TestConfig {
770 initial_request_size: 256 * 1024,
771 max_read_window_size: 1024 * 1024 * 1024,
772 sequential_prefetch_multiplier: 8,
773 client_part_size: 8 * 1024 * 1024,
774 max_forward_seek_wait_distance: 16 * 1024 * 1024,
775 max_backward_seek_distance: 2 * 1024 * 1024,
776 cache_block_size: 1 * MB,
777 };
778
779 let config = MockClient::config()
781 .bucket("test-bucket")
782 .part_size(test_config.client_part_size)
783 .enable_backpressure(false);
784
785 fail_with_backpressure_precondition_test(prefetcher_type, test_config, config);
786 }
787
788 #[test_case(PrefetcherType::Default)]
789 #[test_case(PrefetcherType::InMemoryCache(1 * MB))]
790 fn fail_with_backpressure_zero_read_window(prefetcher_type: PrefetcherType) {
791 let test_config = TestConfig {
792 initial_request_size: 256 * 1024,
793 max_read_window_size: 1024 * 1024 * 1024,
794 sequential_prefetch_multiplier: 8,
795 client_part_size: 8 * 1024 * 1024,
796 max_forward_seek_wait_distance: 16 * 1024 * 1024,
797 max_backward_seek_distance: 2 * 1024 * 1024,
798 cache_block_size: 1 * MB,
799 };
800
801 let config = MockClient::config()
803 .bucket("test-bucket")
804 .part_size(test_config.client_part_size)
805 .enable_backpressure(true)
806 .initial_read_window_size(0);
807
808 fail_with_backpressure_precondition_test(prefetcher_type, test_config, config);
809 }
810
811 fn fail_sequential_read_test(
812 prefetcher_type: PrefetcherType,
813 size: u64,
814 read_size: usize,
815 test_config: TestConfig,
816 get_failures: HashMap<usize, GetObjectFailureMode<MockClientError>>,
817 ) {
818 let client = MockClient::config()
819 .bucket("test-bucket")
820 .part_size(test_config.client_part_size)
821 .enable_backpressure(true)
822 .initial_read_window_size(test_config.client_part_size)
823 .build();
824 let object = MockObject::ramp(0xaa, size as usize, ETag::for_tests());
825 let etag = object.etag();
826
827 client.add_object("hello", object);
828
829 let client = Arc::new(countdown_failure_client(
830 client,
831 CountdownFailureConfig {
832 get_failures,
833 ..Default::default()
834 },
835 ));
836
837 let prefetcher_config = PrefetcherConfig {
838 max_read_window_size: test_config.max_read_window_size,
839 sequential_prefetch_multiplier: test_config.sequential_prefetch_multiplier,
840 initial_request_size: test_config.initial_request_size,
841 ..Default::default()
842 };
843
844 let prefetcher = build_prefetcher(client, prefetcher_type, prefetcher_config);
845 let object_id = ObjectId::new("hello".to_owned(), etag);
846 let fh = HandleId::new(1);
847 let mut request = prefetcher.prefetch("test-bucket".to_owned(), object_id, fh, size);
848
849 let mut next_offset = 0;
850 loop {
851 let buf = match block_on(request.read(next_offset, read_size)) {
852 Ok(buf) => buf,
853 Err(_) => break,
854 };
855 let buf = buf.into_bytes().unwrap();
856
857 if buf.is_empty() {
858 break;
859 }
860 let expected = ramp_bytes((0xaa + next_offset) as usize, buf.len());
861 assert_eq!(&buf[..], &expected[..buf.len()]);
862 next_offset += buf.len() as u64;
863 }
864 assert!(next_offset < size); }
866
867 #[test_case("invalid range; length=42", PrefetcherType::Default)]
868 #[test_case("invalid range; length=42", PrefetcherType::InMemoryCache(1 * MB))]
869 #[test_case(
871 "At least one of the pre-conditions you specified did not hold",
872 PrefetcherType::Default
873 )]
874 #[test_case("At least one of the pre-conditions you specified did not hold", PrefetcherType::InMemoryCache(1 * MB))]
875 fn fail_request_sequential_small(err_value: &str, prefetcher_type: PrefetcherType) {
876 let config = TestConfig {
877 initial_request_size: 256 * 1024,
878 max_read_window_size: 1024 * 1024 * 1024,
879 sequential_prefetch_multiplier: 8,
880 client_part_size: 8 * 1024 * 1024,
881 max_forward_seek_wait_distance: 16 * 1024 * 1024,
882 max_backward_seek_distance: 2 * 1024 * 1024,
883 cache_block_size: 1 * MB,
884 };
885
886 let mut get_failures = HashMap::new();
887 get_failures.insert(
888 2,
889 GetObjectFailureMode::OperationError(ObjectClientError::ClientError(MockClientError(
890 err_value.to_owned().into(),
891 ))),
892 );
893
894 fail_sequential_read_test(prefetcher_type, 1024 * 1024 + 111, 1024 * 1024, config, get_failures);
895 }
896
897 proptest! {
898 #[test]
899 fn proptest_sequential_read(
900 size in 1u64..1 * 1024 * 1024,
901 read_size in 1usize..1 * 1024 * 1024,
902 config: TestConfig,
903 ) {
904 run_sequential_read_test(PrefetcherType::Default, size, read_size, config);
905 }
906
907 #[test]
908 fn proptest_sequential_read_small_read_size(size in 1u64..1 * 1024 * 1024, read_factor in 1usize..10, config: TestConfig) {
909 let read_size = (size as usize / read_factor).max(1);
911 run_sequential_read_test(PrefetcherType::Default, size, read_size, config);
912 }
913
914 #[test]
915 fn proptest_sequential_read_with_cache(
916 size in 1u64..1 * 1024 * 1024,
917 read_size in 1usize..1 * 1024 * 1024,
918 config: TestConfig,
919 ) {
920 run_sequential_read_test(PrefetcherType::InMemoryCache(config.cache_block_size), size, read_size, config);
921 }
922
923 #[test]
924 fn proptest_sequential_read_small_read_size_with_cache(size in 1u64..1 * 1024 * 1024, read_factor in 1usize..10,
925 config: TestConfig) {
926 let read_size = (size as usize / read_factor).max(1);
928 run_sequential_read_test(PrefetcherType::InMemoryCache(config.cache_block_size), size, read_size, config);
929 }
930 }
931
932 #[test]
933 fn test_sequential_read_regression() {
934 let object_size = 854966;
935 let read_size = 161647;
936 let config = TestConfig {
937 initial_request_size: 484941,
938 max_read_window_size: 81509,
939 sequential_prefetch_multiplier: 1,
940 client_part_size: 181682,
941 max_forward_seek_wait_distance: 1,
942 max_backward_seek_distance: 18668,
943 cache_block_size: 1 * MB,
944 };
945 run_sequential_read_test(PrefetcherType::Default, object_size, read_size, config);
946 }
947
948 fn run_random_read_test(
949 prefetcher_type: PrefetcherType,
950 object_size: u64,
951 reads: Vec<(u64, usize)>,
952 test_config: TestConfig,
953 ) {
954 let client = Arc::new(
955 MockClient::config()
956 .bucket("test-bucket")
957 .part_size(test_config.client_part_size)
958 .enable_backpressure(true)
959 .initial_read_window_size(test_config.client_part_size)
960 .build(),
961 );
962 let object = MockObject::ramp(0xaa, object_size as usize, ETag::for_tests());
963 let etag = object.etag();
964
965 client.add_object("hello", object);
966
967 let prefetcher_config = PrefetcherConfig {
968 max_read_window_size: test_config.max_read_window_size,
969 sequential_prefetch_multiplier: test_config.sequential_prefetch_multiplier,
970 max_forward_seek_wait_distance: test_config.max_forward_seek_wait_distance,
971 max_backward_seek_distance: test_config.max_backward_seek_distance,
972 initial_request_size: test_config.initial_request_size,
973 };
974
975 let prefetcher = build_prefetcher(client, prefetcher_type, prefetcher_config);
976 let object_id = ObjectId::new("hello".to_owned(), etag);
977 let fh = HandleId::new(1);
978 let mut request = prefetcher.prefetch("test-bucket".to_owned(), object_id, fh, object_size);
979
980 for (offset, length) in reads {
981 assert!(offset < object_size);
982 assert!(offset + length as u64 <= object_size);
983 let expected = ramp_bytes((0xaa + offset) as usize, length);
984 let buf = block_on(request.read(offset, length)).unwrap();
985 let buf = buf.into_bytes().unwrap();
986 assert_eq!(buf.len(), expected.len());
987 if buf[..] != expected[..] {
989 for i in 0..buf.len() {
990 if buf[i] != expected[i] {
991 panic!(
992 "buffer mismatch at offset {}, saw {} expected {}",
993 i, buf[i], expected[i]
994 );
995 }
996 }
997 }
998 }
999 }
1000
1001 fn random_read_strategy(max_object_size: u64) -> impl Strategy<Value = (u64, Vec<(u64, usize)>)> {
1002 (1..=max_object_size).prop_flat_map(|object_size| {
1003 (
1004 Just(object_size),
1005 proptest::collection::vec(
1006 (0..object_size).prop_flat_map(move |offset| {
1007 (1..=object_size - offset).prop_map(move |length| (offset, length as usize))
1008 }),
1009 0..10,
1010 ),
1011 )
1012 })
1013 }
1014
1015 proptest! {
1016 #[test]
1017 fn proptest_random_read(
1018 reads in random_read_strategy(1 * 1024 * 1024),
1019 config: TestConfig,
1020 ) {
1021 let (object_size, reads) = reads;
1022 run_random_read_test(PrefetcherType::Default, object_size, reads, config);
1023 }
1024
1025 #[test]
1026 fn proptest_random_read_with_cache(
1027 reads in random_read_strategy(1 * 1024 * 1024),
1028 config: TestConfig,
1029 ) {
1030 let (object_size, reads) = reads;
1031 run_random_read_test(PrefetcherType::InMemoryCache(config.cache_block_size), object_size, reads, config);
1032 }
1033 }
1034
1035 #[test]
1036 fn test_random_read_regression() {
1037 let object_size = 724314;
1038 let reads = vec![(0, 516883)];
1039 let config = TestConfig {
1040 initial_request_size: 3684779,
1041 max_read_window_size: 2147621,
1042 sequential_prefetch_multiplier: 4,
1043 client_part_size: 516882,
1044 max_forward_seek_wait_distance: 16 * 1024 * 1024,
1045 max_backward_seek_distance: 2 * 1024 * 1024,
1046 cache_block_size: 1 * MB,
1047 };
1048 run_random_read_test(PrefetcherType::Default, object_size, reads, config);
1049 }
1050
1051 #[test]
1052 fn test_random_read_regression2() {
1053 let object_size = 755678;
1054 let reads = vec![(0, 278499), (311250, 1)];
1055 let config = TestConfig {
1056 initial_request_size: 556997,
1057 max_read_window_size: 105938,
1058 sequential_prefetch_multiplier: 7,
1059 client_part_size: 1219731,
1060 max_forward_seek_wait_distance: 16 * 1024 * 1024,
1061 max_backward_seek_distance: 2 * 1024 * 1024,
1062 cache_block_size: 1 * MB,
1063 };
1064 run_random_read_test(PrefetcherType::Default, object_size, reads, config);
1065 }
1066
1067 #[test]
1068 fn test_random_read_regression3() {
1069 let object_size = 755678;
1070 let reads = vec![(0, 236766), (291204, 1), (280930, 36002)];
1071 let config = TestConfig {
1072 initial_request_size: 556997,
1073 max_read_window_size: 105938,
1074 sequential_prefetch_multiplier: 7,
1075 client_part_size: 1219731,
1076 max_forward_seek_wait_distance: 2260662,
1077 max_backward_seek_distance: 2369799,
1078 cache_block_size: 1 * MB,
1079 };
1080 run_random_read_test(PrefetcherType::Default, object_size, reads, config);
1081 }
1082
1083 #[test]
1084 fn test_random_read_regression4() {
1085 let object_size = 14201;
1086 let reads = vec![(3584, 1), (9424, 1460), (3582, 3340), (248, 9218)];
1087 let config = TestConfig {
1088 initial_request_size: 457999,
1089 max_read_window_size: 863511,
1090 sequential_prefetch_multiplier: 5,
1091 client_part_size: 1972409,
1092 max_forward_seek_wait_distance: 2810651,
1093 max_backward_seek_distance: 3531090,
1094 cache_block_size: 1 * MB,
1095 };
1096 run_random_read_test(PrefetcherType::Default, object_size, reads, config);
1097 }
1098
1099 #[test]
1100 fn test_forward_seek_failure() {
1101 const PART_SIZE: usize = 8192;
1102 const OBJECT_SIZE: usize = 2 * PART_SIZE;
1103
1104 let client = MockClient::config()
1105 .bucket("test-bucket")
1106 .part_size(PART_SIZE)
1107 .enable_backpressure(true)
1108 .initial_read_window_size(OBJECT_SIZE)
1109 .build();
1110 let object = MockObject::ramp(0xaa, OBJECT_SIZE, ETag::for_tests());
1111 let etag = object.etag();
1112 client.add_object("hello", object);
1113
1114 let mut get_failures = HashMap::new();
1115 get_failures.insert(
1116 1,
1117 GetObjectFailureMode::StreamPositionError(
1118 2,
1119 ObjectClientError::ClientError(MockClientError(
1120 "error in the second chunk of the first request".into(),
1121 )),
1122 ),
1123 );
1124 get_failures.insert(
1125 2,
1126 GetObjectFailureMode::OperationError(ObjectClientError::ClientError(MockClientError(
1127 "error in second request".into(),
1128 ))),
1129 );
1130
1131 let client = Arc::new(countdown_failure_client(
1132 client,
1133 CountdownFailureConfig {
1134 get_failures,
1135 ..Default::default()
1136 },
1137 ));
1138 let prefetcher = build_prefetcher(client, PrefetcherType::Default, Default::default());
1139 block_on(async {
1140 let object_id = ObjectId::new("hello".to_owned(), etag.clone());
1141 let fh = HandleId::new(1);
1142 let mut request = prefetcher.prefetch("test-bucket".to_owned(), object_id, fh, OBJECT_SIZE as u64);
1143
1144 _ = request.read(0, 1).await.expect("first read should succeed");
1146
1147 let offset = PART_SIZE + 1;
1149 _ = request.read(offset as u64, 1).await.expect_err("seek should fail");
1150
1151 _ = request
1153 .read(offset as u64, 1)
1154 .await
1155 .expect_err("first retry after failure should fail");
1156
1157 let byte = request
1159 .read(offset as u64, 1)
1160 .await
1161 .expect("second retry should succeed");
1162 let expected = ramp_bytes(0xaa + offset, 1);
1163 assert_eq!(byte.into_bytes().unwrap()[..], expected[..]);
1164 });
1165 }
1166
1167 #[test_case(PrefetcherType::Default)]
1168 #[test_case(PrefetcherType::InMemoryCache(8192))]
1169 fn test_short_read_failure(prefetcher_type: PrefetcherType) {
1170 const PART_SIZE: usize = 8192;
1171 const OBJECT_SIZE: usize = 2 * PART_SIZE;
1172
1173 let client = MockClient::config()
1174 .bucket("test-bucket")
1175 .part_size(PART_SIZE)
1176 .enable_backpressure(true)
1177 .initial_read_window_size(PART_SIZE)
1178 .build();
1179 let object = MockObject::ramp(0xaa, OBJECT_SIZE, ETag::for_tests());
1180 let etag = object.etag();
1181 client.add_object("hello", object);
1182
1183 let mut get_failures = HashMap::new();
1184 get_failures.insert(1, GetObjectFailureMode::StreamShortCircuit(1));
1186 get_failures.insert(3, GetObjectFailureMode::StreamShortCircuit(1));
1189
1190 let client = Arc::new(countdown_failure_client(
1191 client,
1192 CountdownFailureConfig {
1193 get_failures,
1194 ..Default::default()
1195 },
1196 ));
1197 let prefetcher_config = PrefetcherConfig {
1198 initial_request_size: PART_SIZE,
1199 ..Default::default()
1200 };
1201 let prefetcher = build_prefetcher(client, prefetcher_type, prefetcher_config);
1202
1203 block_on(async {
1204 let object_id = ObjectId::new("hello".to_owned(), etag.clone());
1205 let fh = HandleId::new(1);
1206 let mut request = prefetcher.prefetch("test-bucket".to_owned(), object_id, fh, OBJECT_SIZE as u64);
1207
1208 assert!(matches!(
1210 request.read(0, 10).await.expect_err("read should fail"),
1211 PrefetchReadError::GetRequestTerminatedUnexpectedly,
1212 ));
1213
1214 let bytes = request.read(0, PART_SIZE).await.unwrap();
1216 let expected = ramp_bytes(0xaa, PART_SIZE);
1217 assert_eq!(bytes.into_bytes().unwrap()[..], expected[..]);
1218 _ = request
1219 .read(PART_SIZE as u64, PART_SIZE)
1220 .await
1221 .expect_err("read should fail");
1222
1223 let bytes = request.read(0, OBJECT_SIZE).await.unwrap();
1225 let expected = ramp_bytes(0xaa, OBJECT_SIZE);
1226 assert_eq!(bytes.into_bytes().unwrap()[..], expected[..]);
1227
1228 let bytes = request.read(PART_SIZE as u64, OBJECT_SIZE).await.unwrap();
1230 let expected = ramp_bytes(0xaa + PART_SIZE, PART_SIZE);
1231 assert_eq!(bytes.into_bytes().unwrap()[..], expected[..]);
1232 });
1233 }
1234
1235 #[test_case(0, 25; "no first read")]
1236 #[test_case(60, 25; "read beyond first part")]
1237 #[test_case(20, 25; "read in first part")]
1238 #[test_case(125, 110; "read in second request")]
1239 fn test_forward_seek(first_read_size: usize, part_size: usize) {
1240 const OBJECT_SIZE: usize = 200;
1241 const FIRST_REQUEST_SIZE: usize = 100;
1242
1243 let client = Arc::new(
1244 MockClient::config()
1245 .bucket("test-bucket")
1246 .part_size(part_size)
1247 .enable_backpressure(true)
1248 .initial_read_window_size(FIRST_REQUEST_SIZE)
1249 .build(),
1250 );
1251 let object = MockObject::ramp(0xaa, OBJECT_SIZE, ETag::for_tests());
1252 let etag = object.etag();
1253
1254 client.add_object("hello", object);
1255
1256 let prefetcher = build_prefetcher(client, PrefetcherType::Default, Default::default());
1257
1258 for offset in first_read_size + 1..OBJECT_SIZE {
1260 let object_id = ObjectId::new("hello".to_owned(), etag.clone());
1261 let fh = HandleId::new(1);
1262 let mut request = prefetcher.prefetch("test-bucket".to_owned(), object_id, fh, OBJECT_SIZE as u64);
1263 if first_read_size > 0 {
1264 let _first_read = block_on(request.read(0, first_read_size)).unwrap();
1265 }
1266
1267 let byte = block_on(request.read(offset as u64, 1)).unwrap();
1268 let expected = ramp_bytes(0xaa + offset, 1);
1269 assert_eq!(byte.into_bytes().unwrap()[..], expected[..]);
1270 }
1271 }
1272
1273 #[test_case(60, 25; "read beyond first part")]
1274 #[test_case(20, 25; "read in first part")]
1275 #[test_case(125, 110; "read in second request")]
1276 fn test_backward_seek(first_read_size: usize, part_size: usize) {
1277 const OBJECT_SIZE: usize = 200;
1278
1279 let client = Arc::new(
1280 MockClient::config()
1281 .bucket("test-bucket")
1282 .part_size(part_size)
1283 .enable_backpressure(true)
1284 .initial_read_window_size(part_size)
1285 .build(),
1286 );
1287 let object = MockObject::ramp(0xaa, OBJECT_SIZE, ETag::for_tests());
1288 let etag = object.etag();
1289
1290 client.add_object("hello", object);
1291
1292 let prefetcher = build_prefetcher(client, PrefetcherType::Default, Default::default());
1293
1294 for offset in 0..first_read_size {
1296 let object_id = ObjectId::new("hello".to_owned(), etag.clone());
1297 let fh = HandleId::new(1);
1298 let mut request = prefetcher.prefetch("test-bucket".to_owned(), object_id, fh, OBJECT_SIZE as u64);
1299 if first_read_size > 0 {
1300 let _first_read = block_on(request.read(0, first_read_size)).unwrap();
1301 }
1302
1303 let byte = block_on(request.read(offset as u64, 1)).unwrap();
1304 let expected = ramp_bytes(0xaa + offset, 1);
1305 assert_eq!(byte.into_bytes().unwrap()[..], expected[..]);
1306 }
1307 }
1308
1309 #[test_case(8 * 1024 * 1024, 1 * 1024 * 1024, 8 * 1024 * 1024; "8MiB part_size, 1MiB window")]
1310 #[test_case(1 * 1024 * 1024, 1 * 1024 * 1024, 1 * 1024 * 1024; "equal part_size and window")]
1311 #[test_case(250 * 1024, 1 * 1024 * 1024, 1250 * 1024; "window larger than part_size")]
1312 fn test_seek_window_reservation(part_size: usize, seek_window_size: usize, expected: u64) {
1313 let reservation = PrefetchGetObject::<MockClient>::seek_window_reservation(part_size, seek_window_size);
1314 assert_eq!(reservation, expected);
1315 }
1316
1317 #[test_case(8 * MB, 8 * MB, 1 * MB + 128 * KB; "default")]
1318 #[test_case(8 * MB, 8 * MB, 0; "no initial request")]
1319 #[test_case(1 * KB, 1 * MB, 10 * MB; "initial request larger than part size")]
1320 #[test_case(16 * MB, 8 * MB, 1 * MB + 128 * KB; "larger intial read window")]
1321 #[test_case(16 * MB, 8 * MB, 0; "larger intial read window w/o initial request")]
1322 #[test_case(1 * KB, 8 * MB, 1 * MB + 128 * KB; "smaller intial read window")]
1323 #[test_case(1 * KB, 8 * MB, 0; "smaller intial read window w/o initial request")]
1324 fn test_initial_reqeust_size(initial_read_window_size: usize, part_size: usize, initial_request_size: usize) {
1325 let object_size = (16 * MB) as u64;
1326
1327 let client = Arc::new(
1328 MockClient::config()
1329 .bucket("test-bucket")
1330 .part_size(part_size)
1331 .enable_backpressure(true)
1332 .initial_read_window_size(initial_read_window_size)
1333 .build(),
1334 );
1335
1336 let object = MockObject::ramp(0xaa, object_size as usize, ETag::for_tests());
1337 let etag = object.etag();
1338 client.add_object("test-object", object);
1339
1340 let prefetcher_config = PrefetcherConfig {
1341 initial_request_size,
1342 ..Default::default()
1343 };
1344
1345 let prefetcher = build_prefetcher(client.clone(), PrefetcherType::Default, prefetcher_config);
1346 let object_id = ObjectId::new("test-object".to_owned(), etag);
1347 let fh = HandleId::new(1);
1348 let mut request = prefetcher.prefetch("test-bucket".to_owned(), object_id, fh, object_size);
1349
1350 let mut next_offset = 0;
1352 while next_offset < object_size {
1353 let buf = block_on(request.read(next_offset, 256 * KB)).unwrap();
1354 if buf.is_empty() {
1355 break;
1356 }
1357 let buf = buf.into_bytes().unwrap();
1358 let expected = ramp_bytes((0xaa + next_offset) as usize, buf.len());
1359 assert_eq!(&buf[..], &expected[..]);
1360 next_offset += buf.len() as u64;
1361 }
1362 assert_eq!(next_offset, object_size);
1363 }
1364
1365 #[cfg(feature = "shuttle")]
1366 mod shuttle_tests {
1367 use super::*;
1368 use futures::task::{FutureObj, Spawn, SpawnError};
1369 use shuttle::future::block_on;
1370 use shuttle::rand::Rng;
1371 use shuttle::{check_pct, check_random};
1372
1373 struct ShuttleRuntime;
1374 impl Spawn for ShuttleRuntime {
1375 fn spawn_obj(&self, future: FutureObj<'static, ()>) -> Result<(), SpawnError> {
1376 shuttle::future::spawn(future);
1377 Ok(())
1378 }
1379 }
1380
1381 fn sequential_read_stress_helper() {
1382 let mut rng = shuttle::rand::thread_rng();
1383 let object_size = rng.gen_range(1u64..1 * 1024 * 1024);
1384 let max_read_window_size = rng.gen_range(16usize..1 * 1024 * 1024);
1385 let sequential_prefetch_multiplier = rng.gen_range(2usize..16);
1386 let part_size = rng.gen_range(16usize..INITIAL_REQUEST_SIZE);
1387 let initial_request_size = rng.gen_range(0..INITIAL_REQUEST_SIZE);
1388 let max_forward_seek_wait_distance = rng.gen_range(16u64..1 * 1024 * 1024 + 256 * 1024);
1389 let max_backward_seek_distance = rng.gen_range(16u64..1 * 1024 * 1024 + 256 * 1024);
1390
1391 let client = Arc::new(
1392 MockClient::config()
1393 .bucket("test-bucket")
1394 .part_size(part_size)
1395 .enable_backpressure(true)
1396 .initial_read_window_size(part_size)
1397 .build(),
1398 );
1399 let pool = PagedPool::new_with_candidate_sizes([part_size]);
1400 let mem_limiter = Arc::new(MemoryLimiter::new(pool, MINIMUM_MEM_LIMIT));
1401 let object = MockObject::ramp(0xaa, object_size as usize, ETag::for_tests());
1402 let file_etag = object.etag();
1403
1404 client.add_object("hello", object);
1405
1406 let prefetcher_config = PrefetcherConfig {
1407 max_read_window_size,
1408 sequential_prefetch_multiplier,
1409 max_forward_seek_wait_distance,
1410 max_backward_seek_distance,
1411 initial_request_size,
1412 };
1413
1414 let prefetcher =
1415 Prefetcher::default_builder(client).build(Runtime::new(ShuttleRuntime), mem_limiter, prefetcher_config);
1416 let object_id = ObjectId::new("hello".to_owned(), file_etag);
1417 let fh = HandleId::new(1);
1418 let mut request = prefetcher.prefetch("test-bucket".to_owned(), object_id, fh, object_size);
1419
1420 let mut next_offset = 0;
1421 loop {
1422 let read_size = rng.gen_range(1usize..1 * 1024 * 1024);
1423 let buf = block_on(request.read(next_offset, read_size)).unwrap();
1424 if buf.is_empty() {
1425 break;
1426 }
1427 let buf = buf.into_bytes().unwrap();
1428 let expected = ramp_bytes((0xaa + next_offset) as usize, buf.len());
1429 assert_eq!(&buf[..], &expected[..buf.len()]);
1430 next_offset += buf.len() as u64;
1431 }
1432 assert_eq!(next_offset, object_size);
1433 }
1434
1435 #[test]
1436 fn sequential_read_stress() {
1437 check_random(sequential_read_stress_helper, 1000);
1438 check_pct(sequential_read_stress_helper, 1000, 3);
1439 }
1440
1441 fn random_read_stress_helper() {
1442 let mut rng = shuttle::rand::thread_rng();
1443 let max_read_window_size = rng.gen_range(16usize..32 * 1024);
1444 let sequential_prefetch_multiplier = rng.gen_range(2usize..16);
1445 let part_size = rng.gen_range(16usize..128 * 1024);
1446 let initial_request_size = rng.gen_range(16usize..128 * 1024);
1447 let max_forward_seek_wait_distance = rng.gen_range(16u64..192 * 1024);
1448 let max_backward_seek_distance = rng.gen_range(16u64..192 * 1024);
1449 let max_object_size = initial_request_size.min(max_read_window_size) * 20;
1452 let object_size = rng.gen_range(1u64..(64 * 1024).min(max_object_size) as u64);
1453
1454 let client = Arc::new(
1455 MockClient::config()
1456 .bucket("test-bucket")
1457 .part_size(part_size)
1458 .enable_backpressure(true)
1459 .initial_read_window_size(part_size)
1460 .build(),
1461 );
1462 let pool = PagedPool::new_with_candidate_sizes([part_size]);
1463 let mem_limiter = Arc::new(MemoryLimiter::new(pool, MINIMUM_MEM_LIMIT));
1464 let object = MockObject::ramp(0xaa, object_size as usize, ETag::for_tests());
1465 let file_etag = object.etag();
1466
1467 client.add_object("hello", object);
1468
1469 let prefetcher_config = PrefetcherConfig {
1470 max_read_window_size,
1471 sequential_prefetch_multiplier,
1472 max_forward_seek_wait_distance,
1473 max_backward_seek_distance,
1474 initial_request_size,
1475 };
1476
1477 let prefetcher =
1478 Prefetcher::default_builder(client).build(Runtime::new(ShuttleRuntime), mem_limiter, prefetcher_config);
1479 let object_id = ObjectId::new("hello".to_owned(), file_etag);
1480 let fh = HandleId::new(1);
1481 let mut request = prefetcher.prefetch("test-bucket".to_owned(), object_id, fh, object_size);
1482
1483 let num_reads = rng.gen_range(10usize..50);
1484 for _ in 0..num_reads {
1485 let offset = rng.gen_range(0u64..object_size);
1486 let length = rng.gen_range(1usize..(object_size - offset + 1) as usize);
1487 let expected = ramp_bytes((0xaa + offset) as usize, length);
1488 let buf = block_on(request.read(offset, length)).unwrap();
1489 let buf = buf.into_bytes().unwrap();
1490 assert_eq!(buf.len(), expected.len());
1491 if buf[..] != expected[..] {
1493 for i in 0..buf.len() {
1494 if buf[i] != expected[i] {
1495 panic!(
1496 "buffer mismatch at offset {}, saw {} expected {}",
1497 i, buf[i], expected[i]
1498 );
1499 }
1500 }
1501 }
1502 }
1503 }
1504
1505 #[test]
1506 fn random_read_stress() {
1507 check_random(random_read_stress_helper, 1000);
1508 check_pct(random_read_stress_helper, 1000, 3);
1509 }
1510 }
1511}