1use bytes::Bytes;
5use futures::channel::oneshot;
6use futures::{FutureExt, TryFutureExt};
7use object_store::path::Path;
8use std::collections::BinaryHeap;
9use std::fmt::Debug;
10use std::future::Future;
11use std::num::NonZero;
12use std::ops::Range;
13use std::sync::atomic::{AtomicU64, Ordering};
14use std::sync::{Arc, Mutex};
15use std::time::Instant;
16use tokio::sync::Notify;
17
18use lance_core::utils::parse::str_is_truthy;
19use lance_core::{Error, Result};
20
21use crate::object_store::ObjectStore;
22use crate::traits::Reader;
23use crate::utils::CachedFileSize;
24
25mod lite;
26
27const BACKPRESSURE_MIN: u64 = 5;
29const BACKPRESSURE_DEBOUNCE: u64 = 60;
31
32static IOPS_COUNTER: AtomicU64 = AtomicU64::new(0);
34static BYTES_READ_COUNTER: AtomicU64 = AtomicU64::new(0);
36
37pub fn iops_counter() -> u64 {
38 IOPS_COUNTER.load(Ordering::Acquire)
39}
40
41pub fn bytes_read_counter() -> u64 {
42 BYTES_READ_COUNTER.load(Ordering::Acquire)
43}
44
45struct PrioritiesInFlight {
54 in_flight: Vec<u128>,
55}
56
57impl PrioritiesInFlight {
58 fn new(capacity: u32) -> Self {
59 Self {
60 in_flight: Vec::with_capacity(capacity as usize * 2),
61 }
62 }
63
64 fn min_in_flight(&self) -> u128 {
65 self.in_flight.first().copied().unwrap_or(u128::MAX)
66 }
67
68 fn push(&mut self, prio: u128) {
69 let pos = match self.in_flight.binary_search(&prio) {
70 Ok(pos) => pos,
71 Err(pos) => pos,
72 };
73 self.in_flight.insert(pos, prio);
74 }
75
76 fn remove(&mut self, prio: u128) {
77 if let Ok(pos) = self.in_flight.binary_search(&prio) {
78 self.in_flight.remove(pos);
79 }
80 }
81}
82
83struct IoQueueState {
84 iops_avail: u32,
86 bytes_avail: i64,
90 pending_requests: BinaryHeap<IoTask>,
92 priorities_in_flight: PrioritiesInFlight,
94 done_scheduling: bool,
97 start: Instant,
99 last_warn: AtomicU64,
101 no_backpressure: bool,
103}
104
105impl IoQueueState {
106 fn new(io_capacity: u32, io_buffer_size: u64) -> Self {
107 Self {
108 iops_avail: io_capacity,
109 bytes_avail: io_buffer_size as i64,
110 pending_requests: BinaryHeap::new(),
111 priorities_in_flight: PrioritiesInFlight::new(io_capacity),
112 done_scheduling: false,
113 start: Instant::now(),
114 last_warn: AtomicU64::from(0),
115 no_backpressure: io_buffer_size == 0,
116 }
117 }
118
119 fn warn_if_needed(&self) {
120 let seconds_elapsed = self.start.elapsed().as_secs();
121 let last_warn = self.last_warn.load(Ordering::Acquire);
122 let since_last_warn = seconds_elapsed - last_warn;
123 if (last_warn == 0
124 && seconds_elapsed > BACKPRESSURE_MIN
125 && seconds_elapsed < BACKPRESSURE_DEBOUNCE)
126 || since_last_warn > BACKPRESSURE_DEBOUNCE
127 {
128 tracing::event!(tracing::Level::DEBUG, "Backpressure throttle exceeded");
129 log::debug!(
130 "Backpressure throttle is full, I/O will pause until buffer is drained. Max I/O bandwidth will not be achieved because CPU is falling behind"
131 );
132 self.last_warn
133 .store(seconds_elapsed.max(1), Ordering::Release);
134 }
135 }
136
137 fn can_deliver(&self, task: &IoTask) -> bool {
138 if self.iops_avail == 0 {
139 false
140 } else if self.no_backpressure
141 || task.bypass_backpressure
142 || task.priority <= self.priorities_in_flight.min_in_flight()
143 {
144 true
145 } else if task.num_bytes() as i64 > self.bytes_avail {
146 self.warn_if_needed();
147 false
148 } else {
149 true
150 }
151 }
152
153 fn next_task(&mut self) -> Option<IoTask> {
154 let task = self.pending_requests.peek()?;
155 if self.can_deliver(task) {
156 let skip_bytes_accounting = self.no_backpressure || task.bypass_backpressure;
157 self.priorities_in_flight.push(task.priority);
158 self.iops_avail -= 1;
159 if !skip_bytes_accounting {
160 self.bytes_avail -= task.num_bytes() as i64;
161 if self.bytes_avail < 0 {
162 log::debug!(
164 "Backpressure throttle temporarily exceeded by {} bytes due to priority I/O",
165 -self.bytes_avail
166 );
167 }
168 }
169 Some(self.pending_requests.pop().unwrap())
170 } else {
171 None
172 }
173 }
174}
175
176struct IoQueue {
181 state: Mutex<IoQueueState>,
183 notify: Notify,
185}
186
187impl IoQueue {
188 fn new(io_capacity: u32, io_buffer_size: u64) -> Self {
189 Self {
190 state: Mutex::new(IoQueueState::new(io_capacity, io_buffer_size)),
191 notify: Notify::new(),
192 }
193 }
194
195 fn push(&self, task: IoTask) {
196 log::trace!(
197 "Inserting I/O request for {} bytes with priority ({},{}) into I/O queue",
198 task.num_bytes(),
199 task.priority >> 64,
200 task.priority & 0xFFFFFFFFFFFFFFFF
201 );
202 let mut state = self.state.lock().unwrap();
203 state.pending_requests.push(task);
204 drop(state);
205
206 self.notify.notify_one();
207 }
208
209 async fn pop(&self) -> Option<IoTask> {
210 loop {
211 {
212 let mut state = self.state.lock().unwrap();
213 if let Some(task) = state.next_task() {
214 return Some(task);
215 }
216
217 if state.done_scheduling {
218 return None;
219 }
220 }
221
222 self.notify.notified().await;
223 }
224 }
225
226 fn on_iop_complete(&self) {
227 let mut state = self.state.lock().unwrap();
228 state.iops_avail += 1;
229 drop(state);
230
231 self.notify.notify_one();
232 }
233
234 fn on_bytes_consumed(&self, bytes: u64, priority: u128, num_reqs: usize) {
235 let mut state = self.state.lock().unwrap();
236 state.bytes_avail += bytes as i64;
237 for _ in 0..num_reqs {
238 state.priorities_in_flight.remove(priority);
239 }
240 drop(state);
241
242 self.notify.notify_one();
243 }
244
245 fn close(&self) {
246 let mut state = self.state.lock().unwrap();
247 state.done_scheduling = true;
248 let pending_requests = std::mem::take(&mut state.pending_requests);
249 drop(state);
250 for request in pending_requests {
251 request.cancel();
252 }
253
254 self.notify.notify_one();
255 }
256}
257
258struct MutableBatch<F: FnOnce(Response) + Send> {
263 when_done: Option<F>,
264 data_buffers: Vec<Bytes>,
265 num_bytes: u64,
266 priority: u128,
267 num_reqs: usize,
268 err: Option<Box<dyn std::error::Error + Send + Sync + 'static>>,
269 bypass_backpressure: bool,
271}
272
273impl<F: FnOnce(Response) + Send> MutableBatch<F> {
274 fn new(
275 when_done: F,
276 num_data_buffers: u32,
277 priority: u128,
278 num_reqs: usize,
279 bypass_backpressure: bool,
280 ) -> Self {
281 Self {
282 when_done: Some(when_done),
283 data_buffers: vec![Bytes::default(); num_data_buffers as usize],
284 num_bytes: 0,
285 priority,
286 num_reqs,
287 err: None,
288 bypass_backpressure,
289 }
290 }
291}
292
293impl<F: FnOnce(Response) + Send> Drop for MutableBatch<F> {
298 fn drop(&mut self) {
299 let result = if self.err.is_some() {
301 Err(Error::wrapped(self.err.take().unwrap()))
302 } else {
303 let mut data = Vec::new();
304 std::mem::swap(&mut data, &mut self.data_buffers);
305 Ok(data)
306 };
307 let response = Response {
310 data: result,
311 num_bytes: if self.bypass_backpressure {
313 0
314 } else {
315 self.num_bytes
316 },
317 priority: self.priority,
318 num_reqs: self.num_reqs,
319 };
320 (self.when_done.take().unwrap())(response);
321 }
322}
323
324struct DataChunk {
325 task_idx: usize,
326 num_bytes: u64,
327 data: Result<Bytes>,
328}
329
330trait DataSink: Send {
331 fn deliver_data(&mut self, data: DataChunk);
332}
333
334impl<F: FnOnce(Response) + Send> DataSink for MutableBatch<F> {
335 fn deliver_data(&mut self, data: DataChunk) {
337 self.num_bytes += data.num_bytes;
338 match data.data {
339 Ok(data_bytes) => {
340 self.data_buffers[data.task_idx] = data_bytes;
341 }
342 Err(err) => {
343 self.err.get_or_insert(Box::new(err));
345 }
346 }
347 }
348}
349
350struct IoTask {
351 reader: Arc<dyn Reader>,
352 to_read: Range<u64>,
353 when_done: Box<dyn FnOnce(Result<Bytes>) + Send>,
354 priority: u128,
355 bypass_backpressure: bool,
356}
357
358impl Eq for IoTask {}
359
360impl PartialEq for IoTask {
361 fn eq(&self, other: &Self) -> bool {
362 self.bypass_backpressure == other.bypass_backpressure && self.priority == other.priority
363 }
364}
365
366impl PartialOrd for IoTask {
367 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
368 Some(self.cmp(other))
369 }
370}
371
372impl Ord for IoTask {
373 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
374 self.bypass_backpressure
377 .cmp(&other.bypass_backpressure)
378 .then(other.priority.cmp(&self.priority))
379 }
380}
381
382impl IoTask {
383 fn num_bytes(&self) -> u64 {
384 self.to_read.end - self.to_read.start
385 }
386 fn cancel(self) {
387 (self.when_done)(Err(Error::internal(
388 "Scheduler closed before I/O was completed".to_string(),
389 )));
390 }
391
392 async fn run(self) {
393 let file_path = self.reader.path().as_ref();
394 let num_bytes = self.num_bytes();
395 let bytes = if self.to_read.start == self.to_read.end {
396 Ok(Bytes::new())
397 } else {
398 let bytes_fut = self
399 .reader
400 .get_range(self.to_read.start as usize..self.to_read.end as usize);
401 IOPS_COUNTER.fetch_add(1, Ordering::Release);
402 let num_bytes = self.num_bytes();
403 bytes_fut
404 .inspect(move |_| {
405 BYTES_READ_COUNTER.fetch_add(num_bytes, Ordering::Release);
406 })
407 .await
408 .map_err(Error::from)
409 };
410 tracing::trace!(
412 file = file_path,
413 bytes_read = num_bytes,
414 requests = 1,
415 range_start = self.to_read.start,
416 range_end = self.to_read.end,
417 "File I/O completed"
418 );
419 (self.when_done)(bytes);
420 }
421}
422
423async fn run_io_loop(tasks: Arc<IoQueue>) {
426 loop {
429 let next_task = tasks.pop().await;
430 match next_task {
431 Some(task) => {
432 tokio::spawn(task.run());
433 }
434 None => {
435 return;
437 }
438 }
439 }
440}
441
442#[derive(Debug)]
443struct StatsCollector {
444 iops: AtomicU64,
445 requests: AtomicU64,
446 bytes_read: AtomicU64,
447}
448
449impl StatsCollector {
450 fn new() -> Self {
451 Self {
452 iops: AtomicU64::new(0),
453 requests: AtomicU64::new(0),
454 bytes_read: AtomicU64::new(0),
455 }
456 }
457
458 fn iops(&self) -> u64 {
459 self.iops.load(Ordering::Relaxed)
460 }
461
462 fn bytes_read(&self) -> u64 {
463 self.bytes_read.load(Ordering::Relaxed)
464 }
465
466 fn requests(&self) -> u64 {
467 self.requests.load(Ordering::Relaxed)
468 }
469
470 fn record_request(&self, request: &[Range<u64>]) {
471 self.requests.fetch_add(1, Ordering::Relaxed);
472 self.iops.fetch_add(request.len() as u64, Ordering::Relaxed);
473 self.bytes_read.fetch_add(
474 request.iter().map(|r| r.end - r.start).sum::<u64>(),
475 Ordering::Relaxed,
476 );
477 }
478}
479
480pub struct ScanStats {
481 pub iops: u64,
482 pub requests: u64,
483 pub bytes_read: u64,
484}
485
486impl ScanStats {
487 fn new(stats: &StatsCollector) -> Self {
488 Self {
489 iops: stats.iops(),
490 requests: stats.requests(),
491 bytes_read: stats.bytes_read(),
492 }
493 }
494}
495
496enum IoQueueType {
497 Standard(Arc<IoQueue>),
498 Lite(Arc<lite::IoQueue>),
499}
500
501pub struct ScanScheduler {
510 object_store: Arc<ObjectStore>,
511 io_queue: IoQueueType,
512 stats: Arc<StatsCollector>,
513}
514
515impl Debug for ScanScheduler {
516 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
517 f.debug_struct("ScanScheduler")
518 .field("object_store", &self.object_store)
519 .finish()
520 }
521}
522
523struct Response {
524 data: Result<Vec<Bytes>>,
525 priority: u128,
526 num_reqs: usize,
527 num_bytes: u64,
528}
529
530#[derive(Debug, Clone, Copy)]
531pub struct SchedulerConfig {
532 pub io_buffer_size_bytes: u64,
536 pub use_lite_scheduler: Option<bool>,
542}
543
544impl SchedulerConfig {
545 pub fn new(io_buffer_size_bytes: u64) -> Self {
546 Self {
547 io_buffer_size_bytes,
548 use_lite_scheduler: std::env::var("LANCE_USE_LITE_SCHEDULER")
549 .ok()
550 .map(|v| str_is_truthy(v.trim())),
551 }
552 }
553
554 pub fn default_for_testing() -> Self {
556 Self {
557 io_buffer_size_bytes: 256 * 1024 * 1024,
558 use_lite_scheduler: None,
559 }
560 }
561
562 pub fn max_bandwidth(store: &ObjectStore) -> Self {
565 Self::new(32 * 1024 * 1024 * store.io_parallelism() as u64)
566 }
567
568 pub fn with_lite_scheduler(self) -> Self {
569 Self {
570 use_lite_scheduler: Some(true),
571 ..self
572 }
573 }
574}
575
576impl ScanScheduler {
577 pub fn new(object_store: Arc<ObjectStore>, config: SchedulerConfig) -> Arc<Self> {
584 let io_capacity = object_store.io_parallelism();
585 let use_lite = config
586 .use_lite_scheduler
587 .unwrap_or_else(|| object_store.prefers_lite_scheduler());
588 let io_queue = if use_lite {
589 let io_queue = Arc::new(lite::IoQueue::new(
590 io_capacity as u64,
591 config.io_buffer_size_bytes,
592 ));
593 IoQueueType::Lite(io_queue)
594 } else {
595 let io_queue = Arc::new(IoQueue::new(
596 io_capacity as u32,
597 config.io_buffer_size_bytes,
598 ));
599 let io_queue_clone = io_queue.clone();
600 tokio::task::spawn(async move { run_io_loop(io_queue_clone).await });
604 IoQueueType::Standard(io_queue)
605 };
606 Arc::new(Self {
607 object_store,
608 io_queue,
609 stats: Arc::new(StatsCollector::new()),
610 })
611 }
612
613 pub async fn open_file_with_priority(
622 self: &Arc<Self>,
623 path: &Path,
624 base_priority: u64,
625 file_size_bytes: &CachedFileSize,
626 ) -> Result<FileScheduler> {
627 let file_size_bytes = if let Some(size) = file_size_bytes.get() {
628 u64::from(size)
629 } else {
630 let size = self.object_store.size(path).await?;
631 if let Some(size) = NonZero::new(size) {
632 file_size_bytes.set(size);
633 }
634 size
635 };
636 let reader = self
637 .object_store
638 .open_with_size(path, file_size_bytes as usize)
639 .await?;
640 let block_size = self.object_store.block_size() as u64;
641 let max_iop_size = self.object_store.max_iop_size();
642 Ok(FileScheduler {
643 reader: reader.into(),
644 block_size,
645 root: self.clone(),
646 base_priority,
647 max_iop_size,
648 bypass_backpressure: false,
649 })
650 }
651
652 pub async fn open_file(
656 self: &Arc<Self>,
657 path: &Path,
658 file_size_bytes: &CachedFileSize,
659 ) -> Result<FileScheduler> {
660 self.open_file_with_priority(path, 0, file_size_bytes).await
661 }
662
663 fn do_submit_request(
664 &self,
665 reader: Arc<dyn Reader>,
666 request: Vec<Range<u64>>,
667 tx: oneshot::Sender<Response>,
668 priority: u128,
669 io_queue: &Arc<IoQueue>,
670 bypass_backpressure: bool,
671 ) {
672 let num_iops = request.len() as u32;
673
674 let when_all_io_done = move |bytes_and_permits| {
675 let _ = tx.send(bytes_and_permits);
677 };
678
679 let dest = Arc::new(Mutex::new(Box::new(MutableBatch::new(
680 when_all_io_done,
681 num_iops,
682 priority,
683 request.len(),
684 bypass_backpressure,
685 ))));
686
687 for (task_idx, iop) in request.into_iter().enumerate() {
688 let dest = dest.clone();
689 let io_queue_clone = io_queue.clone();
690 let num_bytes = iop.end - iop.start;
691 let task = IoTask {
692 reader: reader.clone(),
693 to_read: iop,
694 priority,
695 bypass_backpressure,
696 when_done: Box::new(move |data| {
697 io_queue_clone.on_iop_complete();
698 let mut dest = dest.lock().unwrap();
699 let chunk = DataChunk {
700 data,
701 task_idx,
702 num_bytes,
703 };
704 dest.deliver_data(chunk);
705 }),
706 };
707 io_queue.push(task);
708 }
709 }
710
711 fn submit_request_standard(
712 &self,
713 reader: Arc<dyn Reader>,
714 request: Vec<Range<u64>>,
715 priority: u128,
716 io_queue: &Arc<IoQueue>,
717 bypass_backpressure: bool,
718 ) -> impl Future<Output = Result<Vec<Bytes>>> + Send + use<> {
719 let (tx, rx) = oneshot::channel::<Response>();
720
721 self.do_submit_request(reader, request, tx, priority, io_queue, bypass_backpressure);
722
723 let io_queue_clone = io_queue.clone();
724
725 rx.map(move |wrapped_rsp| {
726 let rsp = wrapped_rsp.unwrap();
729 io_queue_clone.on_bytes_consumed(rsp.num_bytes, rsp.priority, rsp.num_reqs);
730 rsp.data
731 })
732 }
733
734 fn submit_request_lite(
735 &self,
736 reader: Arc<dyn Reader>,
737 request: Vec<Range<u64>>,
738 priority: u128,
739 io_queue: &Arc<lite::IoQueue>,
740 bypass_backpressure: bool,
741 ) -> impl Future<Output = Result<Vec<Bytes>>> + Send + use<> {
742 let maybe_tasks = request
744 .into_iter()
745 .map(|task| {
746 let reader = reader.clone();
747 let queue = io_queue.clone();
748 let run_fn = Box::new(move || {
749 reader
750 .get_range(task.start as usize..task.end as usize)
751 .map_err(Error::from)
752 .boxed()
753 });
754 queue.submit(task, priority, run_fn, bypass_backpressure)
755 })
756 .collect::<Result<Vec<_>>>();
757 match maybe_tasks {
758 Ok(tasks) => async move {
759 let mut results = Vec::with_capacity(tasks.len());
760 for task in tasks {
761 results.push(task.await?);
762 }
763 Ok(results)
764 }
765 .boxed(),
766 Err(e) => async move { Err(e) }.boxed(),
767 }
768 }
769
770 pub fn submit_request(
771 &self,
772 reader: Arc<dyn Reader>,
773 request: Vec<Range<u64>>,
774 priority: u128,
775 bypass_backpressure: bool,
776 ) -> impl Future<Output = Result<Vec<Bytes>>> + Send + use<> {
777 match &self.io_queue {
778 IoQueueType::Standard(io_queue) => {
779 futures::future::Either::Left(self.submit_request_standard(
780 reader,
781 request,
782 priority,
783 io_queue,
784 bypass_backpressure,
785 ))
786 }
787 IoQueueType::Lite(io_queue) => futures::future::Either::Right(
788 self.submit_request_lite(reader, request, priority, io_queue, bypass_backpressure),
789 ),
790 }
791 }
792
793 pub fn stats(&self) -> ScanStats {
794 ScanStats::new(self.stats.as_ref())
795 }
796
797 #[cfg(test)]
798 fn uses_lite_scheduler(&self) -> bool {
799 matches!(self.io_queue, IoQueueType::Lite(_))
800 }
801}
802
803impl Drop for ScanScheduler {
804 fn drop(&mut self) {
805 match &self.io_queue {
817 IoQueueType::Standard(io_queue) => io_queue.close(),
818 IoQueueType::Lite(io_queue) => io_queue.close(),
819 }
820 }
821}
822
823#[derive(Clone, Debug)]
825pub struct FileScheduler {
826 reader: Arc<dyn Reader>,
827 root: Arc<ScanScheduler>,
828 block_size: u64,
829 base_priority: u64,
830 max_iop_size: u64,
831 bypass_backpressure: bool,
832}
833
834fn is_close_together(range1: &Range<u64>, range2: &Range<u64>, block_size: u64) -> bool {
835 range2.start <= (range1.end + block_size)
837}
838
839fn is_overlapping(range1: &Range<u64>, range2: &Range<u64>) -> bool {
840 range1.start < range2.end && range2.start < range1.end
841}
842
843impl FileScheduler {
844 pub fn submit_request(
857 &self,
858 request: Vec<Range<u64>>,
859 priority: u64,
860 ) -> impl Future<Output = Result<Vec<Bytes>>> + Send + use<> {
861 let priority = ((self.base_priority as u128) << 64) + priority as u128;
863
864 let mut merged_requests = Vec::with_capacity(request.len());
865
866 if !request.is_empty() {
867 let mut curr_interval = request[0].clone();
868
869 for req in request.iter().skip(1) {
870 if is_close_together(&curr_interval, req, self.block_size) {
871 curr_interval.end = curr_interval.end.max(req.end);
872 } else {
873 merged_requests.push(curr_interval);
874 curr_interval = req.clone();
875 }
876 }
877
878 merged_requests.push(curr_interval);
879 }
880
881 let mut updated_requests = Vec::with_capacity(merged_requests.len());
882 for req in merged_requests {
883 if req.is_empty() {
884 updated_requests.push(req);
885 } else {
886 let num_requests = (req.end - req.start).div_ceil(self.max_iop_size);
887 let bytes_per_request = (req.end - req.start) / num_requests;
888 for i in 0..num_requests {
889 let start = req.start + i * bytes_per_request;
890 let end = if i == num_requests - 1 {
891 req.end
893 } else {
894 start + bytes_per_request
895 };
896 updated_requests.push(start..end);
897 }
898 }
899 }
900
901 self.root.stats.record_request(&updated_requests);
902
903 let bytes_vec_fut = self.root.submit_request(
904 self.reader.clone(),
905 updated_requests.clone(),
906 priority,
907 self.bypass_backpressure,
908 );
909
910 let mut updated_index = 0;
911 let mut final_bytes = Vec::with_capacity(request.len());
912
913 async move {
914 let bytes_vec = bytes_vec_fut.await?;
915
916 let mut orig_index = 0;
917 while (updated_index < updated_requests.len()) && (orig_index < request.len()) {
918 let updated_range = &updated_requests[updated_index];
919 let orig_range = &request[orig_index];
920 let byte_offset = updated_range.start as usize;
921
922 if is_overlapping(updated_range, orig_range) {
923 let start = orig_range.start as usize - byte_offset;
925 if orig_range.end <= updated_range.end {
926 let end = orig_range.end as usize - byte_offset;
929 final_bytes.push(bytes_vec[updated_index].slice(start..end));
930 } else {
931 let orig_size = orig_range.end - orig_range.start;
934 let mut merged_bytes = Vec::with_capacity(orig_size as usize);
935 merged_bytes.extend_from_slice(&bytes_vec[updated_index].slice(start..));
936 let mut copy_offset = merged_bytes.len() as u64;
937 while copy_offset < orig_size {
938 updated_index += 1;
939 let next_range = &updated_requests[updated_index];
940 let bytes_to_take =
941 (orig_size - copy_offset).min(next_range.end - next_range.start);
942 merged_bytes.extend_from_slice(
943 &bytes_vec[updated_index].slice(0..bytes_to_take as usize),
944 );
945 copy_offset += bytes_to_take;
946 }
947 final_bytes.push(Bytes::from(merged_bytes));
948 }
949 orig_index += 1;
950 } else {
951 updated_index += 1;
952 }
953 }
954
955 Ok(final_bytes)
956 }
957 }
958
959 pub fn with_priority(&self, priority: u64) -> Self {
960 Self {
961 reader: self.reader.clone(),
962 root: self.root.clone(),
963 block_size: self.block_size,
964 max_iop_size: self.max_iop_size,
965 base_priority: priority,
966 bypass_backpressure: self.bypass_backpressure,
967 }
968 }
969
970 pub fn with_bypass_backpressure(&self) -> Self {
975 Self {
976 bypass_backpressure: true,
977 ..self.clone()
978 }
979 }
980
981 pub fn submit_single(
988 &self,
989 range: Range<u64>,
990 priority: u64,
991 ) -> impl Future<Output = Result<Bytes>> + Send {
992 self.submit_request(vec![range], priority)
993 .map_ok(|vec_bytes| vec_bytes.into_iter().next().unwrap())
994 }
995
996 pub fn reader(&self) -> &Arc<dyn Reader> {
1002 &self.reader
1003 }
1004}
1005
1006#[cfg(test)]
1007mod tests {
1008 use std::{collections::VecDeque, time::Duration};
1009
1010 use futures::poll;
1011 use lance_core::utils::tempfile::TempObjFile;
1012 use rand::RngCore;
1013
1014 use object_store::{GetRange, ObjectStore as OSObjectStore, memory::InMemory};
1015 use tokio::{runtime::Handle, time::timeout};
1016 use url::Url;
1017
1018 use crate::{
1019 object_store::{DEFAULT_DOWNLOAD_RETRY_COUNT, DEFAULT_MAX_IOP_SIZE},
1020 testing::MockObjectStore,
1021 };
1022
1023 use super::*;
1024
1025 fn make_task(priority: u128, bypass_backpressure: bool) -> IoTask {
1026 IoTask {
1027 reader: Arc::new(TrackingReader {
1028 get_range_count: Arc::new(AtomicU64::new(0)),
1029 path: Path::parse("test").unwrap(),
1030 }),
1031 to_read: 0..1,
1032 when_done: Box::new(|_| {}),
1033 priority,
1034 bypass_backpressure,
1035 }
1036 }
1037
1038 #[test]
1039 fn test_iotask_ordering() {
1040 let mut heap = BinaryHeap::new();
1043 heap.push(make_task(10, false)); heap.push(make_task(1, false)); heap.push(make_task(20, true)); heap.push(make_task(5, true)); let order: Vec<(u128, bool)> = std::iter::from_fn(|| heap.pop())
1049 .map(|t| (t.priority, t.bypass_backpressure))
1050 .collect();
1051
1052 assert_eq!(order, vec![(5, true), (20, true), (1, false), (10, false)]);
1053 }
1054
1055 #[tokio::test]
1056 async fn test_full_seq_read() {
1057 let tmp_file = TempObjFile::default();
1058
1059 let obj_store = Arc::new(ObjectStore::local());
1060
1061 const DATA_SIZE: u64 = 1024 * 1024;
1063 let mut some_data = vec![0; DATA_SIZE as usize];
1064 rand::rng().fill_bytes(&mut some_data);
1065 obj_store.put(&tmp_file, &some_data).await.unwrap();
1066
1067 let config = SchedulerConfig::default_for_testing();
1068
1069 let scheduler = ScanScheduler::new(obj_store, config);
1070
1071 let file_scheduler = scheduler
1072 .open_file(&tmp_file, &CachedFileSize::unknown())
1073 .await
1074 .unwrap();
1075
1076 const READ_SIZE: u64 = 4 * 1024;
1078 let mut reqs = VecDeque::new();
1079 let mut offset = 0;
1080 while offset < DATA_SIZE {
1081 reqs.push_back(
1082 #[allow(clippy::single_range_in_vec_init)]
1083 file_scheduler
1084 .submit_request(vec![offset..offset + READ_SIZE], 0)
1085 .await
1086 .unwrap(),
1087 );
1088 offset += READ_SIZE;
1089 }
1090
1091 offset = 0;
1092 while offset < DATA_SIZE {
1094 let data = reqs.pop_front().unwrap();
1095 let actual = &data[0];
1096 let expected = &some_data[offset as usize..(offset + READ_SIZE) as usize];
1097 assert_eq!(expected, actual);
1098 offset += READ_SIZE;
1099 }
1100 }
1101
1102 #[tokio::test]
1103 async fn test_split_coalesce() {
1104 let tmp_file = TempObjFile::default();
1105
1106 let obj_store = Arc::new(ObjectStore::local());
1107
1108 const DATA_SIZE: u64 = 75 * 1024 * 1024;
1110 let mut some_data = vec![0; DATA_SIZE as usize];
1111 rand::rng().fill_bytes(&mut some_data);
1112 obj_store.put(&tmp_file, &some_data).await.unwrap();
1113
1114 let config = SchedulerConfig::default_for_testing();
1115
1116 let scheduler = ScanScheduler::new(obj_store, config);
1117
1118 let file_scheduler = scheduler
1119 .open_file(&tmp_file, &CachedFileSize::unknown())
1120 .await
1121 .unwrap();
1122
1123 let req =
1126 file_scheduler.submit_request(vec![50_000..51_000, 52_000..53_000, 54_000..55_000], 0);
1127
1128 let bytes = req.await.unwrap();
1129
1130 assert_eq!(bytes[0], &some_data[50_000..51_000]);
1131 assert_eq!(bytes[1], &some_data[52_000..53_000]);
1132 assert_eq!(bytes[2], &some_data[54_000..55_000]);
1133
1134 assert_eq!(1, scheduler.stats().iops);
1135
1136 let req = file_scheduler.submit_request(vec![0..DATA_SIZE], 0);
1138 let bytes = req.await.unwrap();
1139 assert!(bytes[0] == some_data, "data is not the same");
1140
1141 assert_eq!(6, scheduler.stats().iops);
1142
1143 let chunk_size = *DEFAULT_MAX_IOP_SIZE;
1147 let req = file_scheduler.submit_request(
1148 vec![
1149 10..chunk_size,
1150 chunk_size + 10..(chunk_size * 2) - 20,
1151 chunk_size * 2..(chunk_size * 2) + 10,
1152 ],
1153 0,
1154 );
1155
1156 let bytes = req.await.unwrap();
1157 let chunk_size = chunk_size as usize;
1158 assert!(
1159 bytes[0] == some_data[10..chunk_size],
1160 "data is not the same"
1161 );
1162 assert!(
1163 bytes[1] == some_data[chunk_size + 10..(chunk_size * 2) - 20],
1164 "data is not the same"
1165 );
1166 assert!(
1167 bytes[2] == some_data[chunk_size * 2..(chunk_size * 2) + 10],
1168 "data is not the same"
1169 );
1170 assert_eq!(8, scheduler.stats().iops);
1171
1172 let reads = (0..44)
1173 .map(|i| i * 1_000_000..(i + 1) * 1_000_000)
1174 .collect::<Vec<_>>();
1175 let req = file_scheduler.submit_request(reads, 0);
1176 let bytes = req.await.unwrap();
1177 for (i, bytes) in bytes.iter().enumerate() {
1178 assert!(
1179 bytes == &some_data[i * 1_000_000..(i + 1) * 1_000_000],
1180 "data is not the same"
1181 );
1182 }
1183 assert_eq!(11, scheduler.stats().iops);
1184 }
1185
1186 #[tokio::test]
1187 async fn test_priority() {
1188 let some_path = Path::parse("foo").unwrap();
1189 let base_store = Arc::new(InMemory::new());
1190 base_store
1191 .put(&some_path, vec![0; 1000].into())
1192 .await
1193 .unwrap();
1194
1195 let semaphore = Arc::new(tokio::sync::Semaphore::new(0));
1196 let mut obj_store = MockObjectStore::default();
1197 let semaphore_copy = semaphore.clone();
1198 obj_store
1199 .expect_get_opts()
1200 .returning(move |location, options| {
1201 let semaphore = semaphore.clone();
1202 let base_store = base_store.clone();
1203 let location = location.clone();
1204 async move {
1205 semaphore.acquire().await.unwrap().forget();
1206 base_store.get_opts(&location, options).await
1207 }
1208 .boxed()
1209 });
1210 let obj_store = Arc::new(ObjectStore::new(
1211 Arc::new(obj_store),
1212 Url::parse("mem://").unwrap(),
1213 Some(500),
1214 None,
1215 false,
1216 false,
1217 1,
1218 DEFAULT_DOWNLOAD_RETRY_COUNT,
1219 None,
1220 ));
1221
1222 let config = SchedulerConfig {
1223 io_buffer_size_bytes: 1024 * 1024,
1224 use_lite_scheduler: None,
1225 };
1226
1227 let scan_scheduler = ScanScheduler::new(obj_store, config);
1228
1229 let file_scheduler = scan_scheduler
1230 .open_file(&Path::parse("foo").unwrap(), &CachedFileSize::new(1000))
1231 .await
1232 .unwrap();
1233
1234 let first_fut = timeout(
1238 Duration::from_secs(10),
1239 file_scheduler.submit_single(0..10, 0),
1240 )
1241 .boxed();
1242
1243 let mut second_fut = timeout(
1245 Duration::from_secs(10),
1246 file_scheduler.submit_single(0..20, 100),
1247 )
1248 .boxed();
1249
1250 let mut third_fut = timeout(
1253 Duration::from_secs(10),
1254 file_scheduler.submit_single(0..30, 0),
1255 )
1256 .boxed();
1257
1258 semaphore_copy.add_permits(1);
1260 assert!(first_fut.await.unwrap().unwrap().len() == 10);
1261 assert!(poll!(&mut second_fut).is_pending());
1263 assert!(poll!(&mut third_fut).is_pending());
1264
1265 semaphore_copy.add_permits(1);
1267 assert!(third_fut.await.unwrap().unwrap().len() == 30);
1268 assert!(poll!(&mut second_fut).is_pending());
1269
1270 semaphore_copy.add_permits(1);
1272 assert!(second_fut.await.unwrap().unwrap().len() == 20);
1273 }
1274
1275 #[tokio::test(flavor = "multi_thread")]
1276 async fn test_backpressure() {
1277 let some_path = Path::parse("foo").unwrap();
1278 let base_store = Arc::new(InMemory::new());
1279 base_store
1280 .put(&some_path, vec![0; 100000].into())
1281 .await
1282 .unwrap();
1283
1284 let bytes_read = Arc::new(AtomicU64::from(0));
1285 let mut obj_store = MockObjectStore::default();
1286 let bytes_read_copy = bytes_read.clone();
1287 obj_store
1289 .expect_get_opts()
1290 .returning(move |location, options| {
1291 let range = options.range.as_ref().unwrap();
1292 let num_bytes = match range {
1293 GetRange::Bounded(bounded) => bounded.end - bounded.start,
1294 _ => panic!(),
1295 };
1296 bytes_read_copy.fetch_add(num_bytes, Ordering::Release);
1297 let location = location.clone();
1298 let base_store = base_store.clone();
1299 async move { base_store.get_opts(&location, options).await }.boxed()
1300 });
1301 let obj_store = Arc::new(ObjectStore::new(
1302 Arc::new(obj_store),
1303 Url::parse("mem://").unwrap(),
1304 Some(500),
1305 None,
1306 false,
1307 false,
1308 1,
1309 DEFAULT_DOWNLOAD_RETRY_COUNT,
1310 None,
1311 ));
1312
1313 let config = SchedulerConfig {
1314 io_buffer_size_bytes: 10,
1315 use_lite_scheduler: None,
1316 };
1317
1318 let scan_scheduler = ScanScheduler::new(obj_store.clone(), config);
1319
1320 let file_scheduler = scan_scheduler
1321 .open_file(&Path::parse("foo").unwrap(), &CachedFileSize::new(100000))
1322 .await
1323 .unwrap();
1324
1325 let wait_for_idle = || async move {
1326 let handle = Handle::current();
1327 while handle.metrics().num_alive_tasks() != 1 {
1328 tokio::time::sleep(Duration::from_millis(10)).await;
1329 }
1330 };
1331 let wait_for_bytes_read_and_idle = |target_bytes: u64| {
1332 let bytes_read = &bytes_read;
1334 async move {
1335 let bytes_read_copy = bytes_read.clone();
1336 while bytes_read_copy.load(Ordering::Acquire) < target_bytes {
1337 tokio::time::sleep(Duration::from_millis(10)).await;
1338 }
1339 wait_for_idle().await;
1340 }
1341 };
1342
1343 let first_fut = file_scheduler.submit_single(0..5, 0);
1345 let second_fut = file_scheduler.submit_single(0..5, 0);
1347 let third_fut = file_scheduler.submit_single(0..3, 0);
1349 wait_for_bytes_read_and_idle(10).await;
1351
1352 assert_eq!(first_fut.await.unwrap().len(), 5);
1353 wait_for_bytes_read_and_idle(13).await;
1355
1356 let fourth_fut = file_scheduler.submit_single(0..5, 0);
1358 wait_for_bytes_read_and_idle(13).await;
1359
1360 assert_eq!(third_fut.await.unwrap().len(), 3);
1362 wait_for_bytes_read_and_idle(18).await;
1363
1364 assert_eq!(second_fut.await.unwrap().len(), 5);
1365 let fifth_fut = file_scheduler.submit_request(vec![0..3, 90000..90007], 0);
1372 wait_for_bytes_read_and_idle(21).await;
1373
1374 let fifth_bytes = tokio::time::timeout(Duration::from_secs(10), fifth_fut)
1376 .await
1377 .unwrap();
1378 assert_eq!(
1379 fifth_bytes.unwrap().iter().map(|b| b.len()).sum::<usize>(),
1380 10
1381 );
1382
1383 assert_eq!(fourth_fut.await.unwrap().len(), 5);
1385 wait_for_bytes_read_and_idle(28).await;
1386
1387 let config = SchedulerConfig {
1389 io_buffer_size_bytes: 10,
1390 use_lite_scheduler: None,
1391 };
1392
1393 let scan_scheduler = ScanScheduler::new(obj_store, config);
1394 let file_scheduler = scan_scheduler
1395 .open_file(&Path::parse("foo").unwrap(), &CachedFileSize::new(100000))
1396 .await
1397 .unwrap();
1398
1399 let first_fut = file_scheduler.submit_single(0..10, 0);
1400 let second_fut = file_scheduler.submit_single(0..10, 0);
1401
1402 std::thread::sleep(Duration::from_millis(100));
1403 assert_eq!(first_fut.await.unwrap().len(), 10);
1404 assert_eq!(second_fut.await.unwrap().len(), 10);
1405 }
1406
1407 #[derive(Debug)]
1409 struct TrackingReader {
1410 get_range_count: Arc<AtomicU64>,
1411 path: Path,
1412 }
1413
1414 impl deepsize::DeepSizeOf for TrackingReader {
1415 fn deep_size_of_children(&self, _context: &mut deepsize::Context) -> usize {
1416 0
1417 }
1418 }
1419
1420 impl Reader for TrackingReader {
1421 fn path(&self) -> &Path {
1422 &self.path
1423 }
1424
1425 fn block_size(&self) -> usize {
1426 4096
1427 }
1428
1429 fn io_parallelism(&self) -> usize {
1430 1
1431 }
1432
1433 fn size(&self) -> futures::future::BoxFuture<'_, object_store::Result<usize>> {
1434 Box::pin(async { Ok(1_000_000) })
1435 }
1436
1437 fn get_range(
1438 &self,
1439 range: Range<usize>,
1440 ) -> futures::future::BoxFuture<'static, object_store::Result<Bytes>> {
1441 self.get_range_count.fetch_add(1, Ordering::Release);
1442 let num_bytes = range.end - range.start;
1443 Box::pin(async move { Ok(Bytes::from(vec![0u8; num_bytes])) })
1444 }
1445
1446 fn get_all(&self) -> futures::future::BoxFuture<'_, object_store::Result<Bytes>> {
1447 Box::pin(async { Ok(Bytes::from(vec![0u8; 1_000_000])) })
1448 }
1449 }
1450
1451 #[tokio::test]
1452 async fn test_lite_scheduler_submits_eagerly() {
1453 let obj_store = Arc::new(ObjectStore::memory());
1454 let config = SchedulerConfig::default_for_testing().with_lite_scheduler();
1455 let scheduler = ScanScheduler::new(obj_store, config);
1456
1457 let get_range_count = Arc::new(AtomicU64::new(0));
1458 let reader: Arc<dyn Reader> = Arc::new(TrackingReader {
1459 get_range_count: get_range_count.clone(),
1460 path: Path::parse("test").unwrap(),
1461 });
1462
1463 let fut1 = scheduler.submit_request(reader.clone(), vec![0..100], 0, false);
1466 let fut2 = scheduler.submit_request(reader.clone(), vec![100..200], 10, false);
1467 let fut3 = scheduler.submit_request(reader.clone(), vec![200..300], 20, false);
1468
1469 assert_eq!(get_range_count.load(Ordering::Acquire), 3);
1471
1472 assert_eq!(fut1.await.unwrap()[0].len(), 100);
1474 assert_eq!(fut2.await.unwrap()[0].len(), 100);
1475 assert_eq!(fut3.await.unwrap()[0].len(), 100);
1476 }
1477
1478 #[tokio::test]
1479 async fn test_object_store_selects_scheduler() {
1480 let memory_store = Arc::new(ObjectStore::memory());
1482 assert!(!memory_store.prefers_lite_scheduler());
1483 let config = SchedulerConfig {
1484 io_buffer_size_bytes: 256 * 1024 * 1024,
1485 use_lite_scheduler: None,
1486 };
1487 let scheduler = ScanScheduler::new(memory_store.clone(), config);
1488 assert!(!scheduler.uses_lite_scheduler());
1489
1490 let uring_store = Arc::new(ObjectStore::new(
1492 Arc::new(InMemory::new()),
1493 Url::parse("file+uring:///tmp").unwrap(),
1494 None,
1495 None,
1496 false,
1497 false,
1498 8,
1499 DEFAULT_DOWNLOAD_RETRY_COUNT,
1500 None,
1501 ));
1502 assert!(uring_store.prefers_lite_scheduler());
1503 let config = SchedulerConfig {
1504 io_buffer_size_bytes: 256 * 1024 * 1024,
1505 use_lite_scheduler: None,
1506 };
1507 let scheduler = ScanScheduler::new(uring_store.clone(), config);
1508 assert!(scheduler.uses_lite_scheduler());
1509
1510 let config = SchedulerConfig {
1512 io_buffer_size_bytes: 256 * 1024 * 1024,
1513 use_lite_scheduler: Some(false),
1514 };
1515 let scheduler = ScanScheduler::new(uring_store, config);
1516 assert!(!scheduler.uses_lite_scheduler());
1517
1518 let config = SchedulerConfig {
1520 io_buffer_size_bytes: 256 * 1024 * 1024,
1521 use_lite_scheduler: Some(true),
1522 };
1523 let scheduler = ScanScheduler::new(memory_store, config);
1524 assert!(scheduler.uses_lite_scheduler());
1525 }
1526
1527 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1528 async fn stress_backpressure() {
1529 let some_path = Path::parse("foo").unwrap();
1533 let obj_store = Arc::new(ObjectStore::memory());
1534 obj_store
1535 .put(&some_path, vec![0; 100000].as_slice())
1536 .await
1537 .unwrap();
1538
1539 let config = SchedulerConfig {
1541 io_buffer_size_bytes: 1,
1542 use_lite_scheduler: None,
1543 };
1544 let scan_scheduler = ScanScheduler::new(obj_store.clone(), config);
1545 let file_scheduler = scan_scheduler
1546 .open_file(&some_path, &CachedFileSize::unknown())
1547 .await
1548 .unwrap();
1549
1550 let mut futs = Vec::with_capacity(10000);
1551 for idx in 0..10000 {
1552 futs.push(file_scheduler.submit_single(idx..idx + 1, idx));
1553 }
1554
1555 for fut in futs {
1556 fut.await.unwrap();
1557 }
1558 }
1559
1560 #[tokio::test(flavor = "multi_thread")]
1561 async fn test_zero_buffer_size_no_backpressure() {
1562 let obj_store = Arc::new(ObjectStore::memory());
1565 let config = SchedulerConfig {
1566 io_buffer_size_bytes: 0,
1567 use_lite_scheduler: Some(false),
1568 };
1569 let scheduler = ScanScheduler::new(obj_store, config);
1570
1571 let get_range_count = Arc::new(AtomicU64::new(0));
1572 let reader: Arc<dyn Reader> = Arc::new(TrackingReader {
1573 get_range_count: get_range_count.clone(),
1574 path: Path::parse("test").unwrap(),
1575 });
1576
1577 let fut1 = scheduler.submit_request(reader.clone(), vec![0..1000], 0, false);
1580 let fut2 = scheduler.submit_request(reader.clone(), vec![1000..2000], 1, false);
1581 let fut3 = scheduler.submit_request(reader.clone(), vec![2000..3000], 2, false);
1582
1583 let bytes1 = timeout(Duration::from_secs(5), fut1)
1584 .await
1585 .unwrap()
1586 .unwrap();
1587 let bytes2 = timeout(Duration::from_secs(5), fut2)
1588 .await
1589 .unwrap()
1590 .unwrap();
1591 let bytes3 = timeout(Duration::from_secs(5), fut3)
1592 .await
1593 .unwrap()
1594 .unwrap();
1595 assert_eq!(bytes1[0].len(), 1000);
1596 assert_eq!(bytes2[0].len(), 1000);
1597 assert_eq!(bytes3[0].len(), 1000);
1598 assert_eq!(get_range_count.load(Ordering::Acquire), 3);
1599 }
1600
1601 #[tokio::test(flavor = "multi_thread")]
1602 async fn test_file_scheduler_bypass_backpressure() {
1603 let some_path = Path::parse("foo").unwrap();
1606 let base_store = Arc::new(InMemory::new());
1607 base_store
1608 .put(&some_path, vec![0u8; 1000].into())
1609 .await
1610 .unwrap();
1611
1612 let bytes_dispatched = Arc::new(AtomicU64::from(0));
1613 let mut obj_store = MockObjectStore::default();
1614 let bytes_dispatched_copy = bytes_dispatched.clone();
1615 obj_store
1616 .expect_get_opts()
1617 .returning(move |location, options| {
1618 let range = options.range.as_ref().unwrap();
1619 let num_bytes = match range {
1620 GetRange::Bounded(bounded) => bounded.end - bounded.start,
1621 _ => panic!(),
1622 };
1623 bytes_dispatched_copy.fetch_add(num_bytes, Ordering::Release);
1624 let location = location.clone();
1625 let base_store = base_store.clone();
1626 async move { base_store.get_opts(&location, options).await }.boxed()
1627 });
1628 let obj_store = Arc::new(ObjectStore::new(
1629 Arc::new(obj_store),
1630 Url::parse("mem://").unwrap(),
1631 Some(500),
1632 None,
1633 false,
1634 false,
1635 1,
1636 DEFAULT_DOWNLOAD_RETRY_COUNT,
1637 None,
1638 ));
1639
1640 let config = SchedulerConfig {
1642 io_buffer_size_bytes: 10,
1643 use_lite_scheduler: Some(false),
1644 };
1645 let scan_scheduler = ScanScheduler::new(obj_store, config);
1646 let file_scheduler = scan_scheduler
1647 .open_file(&Path::parse("foo").unwrap(), &CachedFileSize::new(1000))
1648 .await
1649 .unwrap();
1650 let bypass_scheduler = file_scheduler.with_bypass_backpressure();
1651
1652 let blocker_fut = file_scheduler.submit_single(0..10, 0);
1654 while bytes_dispatched.load(Ordering::Acquire) < 10 {
1655 tokio::time::sleep(Duration::from_millis(1)).await;
1656 }
1657
1658 let normal_fut = file_scheduler.submit_single(0..10, 2);
1661 let bypass_fut = bypass_scheduler.submit_single(0..10, 1);
1662
1663 while bytes_dispatched.load(Ordering::Acquire) < 20 {
1665 tokio::time::sleep(Duration::from_millis(1)).await;
1666 }
1667 tokio::time::sleep(Duration::from_millis(20)).await;
1668 assert_eq!(
1669 bytes_dispatched.load(Ordering::Acquire),
1670 20,
1671 "normal read should still be blocked while budget is exhausted"
1672 );
1673
1674 timeout(Duration::from_secs(5), blocker_fut)
1676 .await
1677 .unwrap()
1678 .unwrap();
1679 timeout(Duration::from_secs(5), bypass_fut)
1680 .await
1681 .unwrap()
1682 .unwrap();
1683 timeout(Duration::from_secs(5), normal_fut)
1684 .await
1685 .unwrap()
1686 .unwrap();
1687 assert_eq!(bytes_dispatched.load(Ordering::Acquire), 30);
1688 }
1689}