1use bytes::Bytes;
5use futures::channel::oneshot;
6use futures::{FutureExt, TryFutureExt};
7use object_store::path::Path;
8use std::collections::BinaryHeap;
9use std::fmt::Debug;
10use std::future::Future;
11use std::num::NonZero;
12use std::ops::Range;
13use std::sync::atomic::{AtomicU64, Ordering};
14use std::sync::{Arc, Mutex};
15use std::time::Instant;
16use tokio::sync::Notify;
17
18use lance_core::utils::io_stats::IoStatsRecorder;
19use lance_core::utils::parse::str_is_truthy;
20use lance_core::{Error, Result};
21
22use crate::object_store::ObjectStore;
23use crate::traits::Reader;
24use crate::utils::CachedFileSize;
25
26mod lite;
27
28const BACKPRESSURE_MIN: u64 = 5;
30const BACKPRESSURE_DEBOUNCE: u64 = 60;
32
33static IOPS_COUNTER: AtomicU64 = AtomicU64::new(0);
35static BYTES_READ_COUNTER: AtomicU64 = AtomicU64::new(0);
37
38pub fn iops_counter() -> u64 {
39 IOPS_COUNTER.load(Ordering::Acquire)
40}
41
42pub fn bytes_read_counter() -> u64 {
43 BYTES_READ_COUNTER.load(Ordering::Acquire)
44}
45
46struct PrioritiesInFlight {
55 in_flight: Vec<u128>,
56}
57
58impl PrioritiesInFlight {
59 fn new(capacity: u32) -> Self {
60 Self {
61 in_flight: Vec::with_capacity(capacity as usize * 2),
62 }
63 }
64
65 fn min_in_flight(&self) -> u128 {
66 self.in_flight.first().copied().unwrap_or(u128::MAX)
67 }
68
69 fn push(&mut self, prio: u128) {
70 let pos = match self.in_flight.binary_search(&prio) {
71 Ok(pos) => pos,
72 Err(pos) => pos,
73 };
74 self.in_flight.insert(pos, prio);
75 }
76
77 fn remove(&mut self, prio: u128) {
78 if let Ok(pos) = self.in_flight.binary_search(&prio) {
79 self.in_flight.remove(pos);
80 }
81 }
82}
83
84struct IoQueueState {
85 iops_avail: u32,
87 bytes_avail: i64,
91 pending_requests: BinaryHeap<IoTask>,
93 priorities_in_flight: PrioritiesInFlight,
95 done_scheduling: bool,
98 start: Instant,
100 last_warn: AtomicU64,
102 no_backpressure: bool,
104}
105
106impl IoQueueState {
107 fn new(io_capacity: u32, io_buffer_size: u64) -> Self {
108 Self {
109 iops_avail: io_capacity,
110 bytes_avail: io_buffer_size as i64,
111 pending_requests: BinaryHeap::new(),
112 priorities_in_flight: PrioritiesInFlight::new(io_capacity),
113 done_scheduling: false,
114 start: Instant::now(),
115 last_warn: AtomicU64::from(0),
116 no_backpressure: io_buffer_size == 0,
117 }
118 }
119
120 fn warn_if_needed(&self) {
121 let seconds_elapsed = self.start.elapsed().as_secs();
122 let last_warn = self.last_warn.load(Ordering::Acquire);
123 let since_last_warn = seconds_elapsed - last_warn;
124 if (last_warn == 0
125 && seconds_elapsed > BACKPRESSURE_MIN
126 && seconds_elapsed < BACKPRESSURE_DEBOUNCE)
127 || since_last_warn > BACKPRESSURE_DEBOUNCE
128 {
129 tracing::event!(tracing::Level::DEBUG, "Backpressure throttle exceeded");
130 log::debug!(
131 "Backpressure throttle is full, I/O will pause until buffer is drained. Max I/O bandwidth will not be achieved because CPU is falling behind"
132 );
133 self.last_warn
134 .store(seconds_elapsed.max(1), Ordering::Release);
135 }
136 }
137
138 fn can_deliver(&self, task: &IoTask) -> bool {
139 if self.iops_avail == 0 {
140 false
141 } else if self.no_backpressure
142 || task.bypass_backpressure
143 || task.priority <= self.priorities_in_flight.min_in_flight()
144 {
145 true
146 } else if task.num_bytes() as i64 > self.bytes_avail {
147 self.warn_if_needed();
148 false
149 } else {
150 true
151 }
152 }
153
154 fn next_task(&mut self) -> Option<IoTask> {
155 let task = self.pending_requests.peek()?;
156 if self.can_deliver(task) {
157 let skip_bytes_accounting = self.no_backpressure || task.bypass_backpressure;
158 self.priorities_in_flight.push(task.priority);
159 self.iops_avail -= 1;
160 if !skip_bytes_accounting {
161 self.bytes_avail -= task.num_bytes() as i64;
162 if self.bytes_avail < 0 {
163 log::debug!(
165 "Backpressure throttle temporarily exceeded by {} bytes due to priority I/O",
166 -self.bytes_avail
167 );
168 }
169 }
170 Some(self.pending_requests.pop().unwrap())
171 } else {
172 None
173 }
174 }
175}
176
177struct IoQueue {
182 state: Mutex<IoQueueState>,
184 notify: Notify,
186}
187
188impl IoQueue {
189 fn new(io_capacity: u32, io_buffer_size: u64) -> Self {
190 Self {
191 state: Mutex::new(IoQueueState::new(io_capacity, io_buffer_size)),
192 notify: Notify::new(),
193 }
194 }
195
196 fn push(&self, task: IoTask) {
197 log::trace!(
198 "Inserting I/O request for {} bytes with priority ({},{}) into I/O queue",
199 task.num_bytes(),
200 task.priority >> 64,
201 task.priority & 0xFFFFFFFFFFFFFFFF
202 );
203 let mut state = self.state.lock().unwrap();
204 state.pending_requests.push(task);
205 drop(state);
206
207 self.notify.notify_one();
208 }
209
210 async fn pop(&self) -> Option<IoTask> {
211 loop {
212 {
213 let mut state = self.state.lock().unwrap();
214 if let Some(task) = state.next_task() {
215 return Some(task);
216 }
217
218 if state.done_scheduling {
219 return None;
220 }
221 }
222
223 self.notify.notified().await;
224 }
225 }
226
227 fn on_iop_complete(&self) {
228 let mut state = self.state.lock().unwrap();
229 state.iops_avail += 1;
230 drop(state);
231
232 self.notify.notify_one();
233 }
234
235 fn on_bytes_consumed(&self, bytes: u64, priority: u128, num_reqs: usize) {
236 let mut state = self.state.lock().unwrap();
237 state.bytes_avail += bytes as i64;
238 for _ in 0..num_reqs {
239 state.priorities_in_flight.remove(priority);
240 }
241 drop(state);
242
243 self.notify.notify_one();
244 }
245
246 fn close(&self) {
247 let mut state = self.state.lock().unwrap();
248 state.done_scheduling = true;
249 let pending_requests = std::mem::take(&mut state.pending_requests);
250 drop(state);
251 for request in pending_requests {
252 request.cancel();
253 }
254
255 self.notify.notify_one();
256 }
257}
258
259struct MutableBatch<F: FnOnce(Response) + Send> {
264 when_done: Option<F>,
265 data_buffers: Vec<Bytes>,
266 num_bytes: u64,
267 priority: u128,
268 num_reqs: usize,
269 err: Option<Box<dyn std::error::Error + Send + Sync + 'static>>,
270 bypass_backpressure: bool,
272}
273
274impl<F: FnOnce(Response) + Send> MutableBatch<F> {
275 fn new(
276 when_done: F,
277 num_data_buffers: u32,
278 priority: u128,
279 num_reqs: usize,
280 bypass_backpressure: bool,
281 ) -> Self {
282 Self {
283 when_done: Some(when_done),
284 data_buffers: vec![Bytes::default(); num_data_buffers as usize],
285 num_bytes: 0,
286 priority,
287 num_reqs,
288 err: None,
289 bypass_backpressure,
290 }
291 }
292}
293
294impl<F: FnOnce(Response) + Send> Drop for MutableBatch<F> {
299 fn drop(&mut self) {
300 let result = if self.err.is_some() {
302 Err(Error::wrapped(self.err.take().unwrap()))
303 } else {
304 let mut data = Vec::new();
305 std::mem::swap(&mut data, &mut self.data_buffers);
306 Ok(data)
307 };
308 let response = Response {
311 data: result,
312 num_bytes: if self.bypass_backpressure {
314 0
315 } else {
316 self.num_bytes
317 },
318 priority: self.priority,
319 num_reqs: self.num_reqs,
320 };
321 (self.when_done.take().unwrap())(response);
322 }
323}
324
325struct DataChunk {
326 task_idx: usize,
327 num_bytes: u64,
328 data: Result<Bytes>,
329}
330
331trait DataSink: Send {
332 fn deliver_data(&mut self, data: DataChunk);
333}
334
335impl<F: FnOnce(Response) + Send> DataSink for MutableBatch<F> {
336 fn deliver_data(&mut self, data: DataChunk) {
338 self.num_bytes += data.num_bytes;
339 match data.data {
340 Ok(data_bytes) => {
341 self.data_buffers[data.task_idx] = data_bytes;
342 }
343 Err(err) => {
344 self.err.get_or_insert(Box::new(err));
346 }
347 }
348 }
349}
350
351struct IoTask {
352 reader: Arc<dyn Reader>,
353 to_read: Range<u64>,
354 when_done: Box<dyn FnOnce(Result<Bytes>) + Send>,
355 priority: u128,
356 bypass_backpressure: bool,
357}
358
359impl Eq for IoTask {}
360
361impl PartialEq for IoTask {
362 fn eq(&self, other: &Self) -> bool {
363 self.bypass_backpressure == other.bypass_backpressure && self.priority == other.priority
364 }
365}
366
367impl PartialOrd for IoTask {
368 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
369 Some(self.cmp(other))
370 }
371}
372
373impl Ord for IoTask {
374 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
375 self.bypass_backpressure
378 .cmp(&other.bypass_backpressure)
379 .then(other.priority.cmp(&self.priority))
380 }
381}
382
383impl IoTask {
384 fn num_bytes(&self) -> u64 {
385 self.to_read.end - self.to_read.start
386 }
387 fn cancel(self) {
388 (self.when_done)(Err(Error::internal(
389 "Scheduler closed before I/O was completed".to_string(),
390 )));
391 }
392
393 async fn run(self) {
394 let file_path = self.reader.path().as_ref();
395 let num_bytes = self.num_bytes();
396 let bytes = if self.to_read.start == self.to_read.end {
397 Ok(Bytes::new())
398 } else {
399 let bytes_fut = self
400 .reader
401 .get_range(self.to_read.start as usize..self.to_read.end as usize);
402 IOPS_COUNTER.fetch_add(1, Ordering::Release);
403 let num_bytes = self.num_bytes();
404 bytes_fut
405 .inspect(move |_| {
406 BYTES_READ_COUNTER.fetch_add(num_bytes, Ordering::Release);
407 })
408 .await
409 .map_err(Error::from)
410 };
411 tracing::trace!(
413 file = file_path,
414 bytes_read = num_bytes,
415 requests = 1,
416 range_start = self.to_read.start,
417 range_end = self.to_read.end,
418 "File I/O completed"
419 );
420 (self.when_done)(bytes);
421 }
422}
423
424async fn run_io_loop(tasks: Arc<IoQueue>) {
427 loop {
430 let next_task = tasks.pop().await;
431 match next_task {
432 Some(task) => {
433 tokio::spawn(task.run());
434 }
435 None => {
436 return;
438 }
439 }
440 }
441}
442
443#[derive(Debug)]
444struct StatsCollector {
445 iops: AtomicU64,
446 requests: AtomicU64,
447 bytes_read: AtomicU64,
448}
449
450impl StatsCollector {
451 fn new() -> Self {
452 Self {
453 iops: AtomicU64::new(0),
454 requests: AtomicU64::new(0),
455 bytes_read: AtomicU64::new(0),
456 }
457 }
458
459 fn iops(&self) -> u64 {
460 self.iops.load(Ordering::Relaxed)
461 }
462
463 fn bytes_read(&self) -> u64 {
464 self.bytes_read.load(Ordering::Relaxed)
465 }
466
467 fn requests(&self) -> u64 {
468 self.requests.load(Ordering::Relaxed)
469 }
470
471 fn record_request(&self, request: &[Range<u64>]) {
472 self.requests.fetch_add(1, Ordering::Relaxed);
473 self.iops.fetch_add(request.len() as u64, Ordering::Relaxed);
474 self.bytes_read.fetch_add(
475 request.iter().map(|r| r.end - r.start).sum::<u64>(),
476 Ordering::Relaxed,
477 );
478 }
479
480 fn add(&self, iops: u64, requests: u64, bytes_read: u64) {
483 self.iops.fetch_add(iops, Ordering::Relaxed);
484 self.requests.fetch_add(requests, Ordering::Relaxed);
485 self.bytes_read.fetch_add(bytes_read, Ordering::Relaxed);
486 }
487}
488
489impl IoStatsRecorder for StatsCollector {
490 fn record_request(&self, request: &[Range<u64>]) {
491 Self::record_request(self, request)
494 }
495}
496
497#[derive(Debug, Clone, Copy, Default)]
498pub struct ScanStats {
499 pub iops: u64,
500 pub requests: u64,
501 pub bytes_read: u64,
502}
503
504impl ScanStats {
505 fn new(stats: &StatsCollector) -> Self {
506 Self {
507 iops: stats.iops(),
508 requests: stats.requests(),
509 bytes_read: stats.bytes_read(),
510 }
511 }
512}
513
514#[derive(Debug, Clone)]
525pub struct IoStats(Arc<StatsCollector>);
526
527impl IoStats {
528 pub fn new() -> Self {
529 Self(Arc::new(StatsCollector::new()))
530 }
531
532 pub fn record_request(&self, request: &[Range<u64>]) {
536 self.0.record_request(request);
537 }
538
539 pub fn snapshot(&self) -> ScanStats {
541 ScanStats::new(self.0.as_ref())
542 }
543
544 pub fn recorder(&self) -> Arc<dyn IoStatsRecorder> {
548 self.0.clone()
549 }
550
551 pub fn add_scan_stats(&self, stats: &ScanStats) {
555 self.0.add(stats.iops, stats.requests, stats.bytes_read);
556 }
557}
558
559impl Default for IoStats {
560 fn default() -> Self {
561 Self::new()
562 }
563}
564
565enum IoQueueType {
566 Standard(Arc<IoQueue>),
567 Lite(Arc<lite::IoQueue>),
568}
569
570pub struct ScanScheduler {
579 object_store: Arc<ObjectStore>,
580 io_queue: IoQueueType,
581 stats: IoStats,
582}
583
584impl Debug for ScanScheduler {
585 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
586 f.debug_struct("ScanScheduler")
587 .field("object_store", &self.object_store)
588 .finish()
589 }
590}
591
592struct Response {
593 data: Result<Vec<Bytes>>,
594 priority: u128,
595 num_reqs: usize,
596 num_bytes: u64,
597}
598
599#[derive(Debug, Clone, Copy)]
600pub struct SchedulerConfig {
601 pub io_buffer_size_bytes: u64,
605 pub use_lite_scheduler: Option<bool>,
611}
612
613impl SchedulerConfig {
614 pub fn new(io_buffer_size_bytes: u64) -> Self {
615 Self {
616 io_buffer_size_bytes,
617 use_lite_scheduler: std::env::var("LANCE_USE_LITE_SCHEDULER")
618 .ok()
619 .map(|v| str_is_truthy(v.trim())),
620 }
621 }
622
623 pub fn default_for_testing() -> Self {
625 Self {
626 io_buffer_size_bytes: 256 * 1024 * 1024,
627 use_lite_scheduler: None,
628 }
629 }
630
631 pub fn max_bandwidth(store: &ObjectStore) -> Self {
634 Self::new(32 * 1024 * 1024 * store.io_parallelism() as u64)
635 }
636
637 pub fn with_lite_scheduler(self) -> Self {
638 Self {
639 use_lite_scheduler: Some(true),
640 ..self
641 }
642 }
643}
644
645impl ScanScheduler {
646 pub fn new(object_store: Arc<ObjectStore>, config: SchedulerConfig) -> Arc<Self> {
653 let io_capacity = object_store.io_parallelism();
654 let use_lite = config
655 .use_lite_scheduler
656 .unwrap_or_else(|| object_store.prefers_lite_scheduler());
657 let io_queue = if use_lite {
658 let io_queue = Arc::new(lite::IoQueue::new(
659 io_capacity as u64,
660 config.io_buffer_size_bytes,
661 ));
662 IoQueueType::Lite(io_queue)
663 } else {
664 let io_queue = Arc::new(IoQueue::new(
665 io_capacity as u32,
666 config.io_buffer_size_bytes,
667 ));
668 let io_queue_clone = io_queue.clone();
669 tokio::task::spawn(async move { run_io_loop(io_queue_clone).await });
673 IoQueueType::Standard(io_queue)
674 };
675 Arc::new(Self {
676 object_store,
677 io_queue,
678 stats: IoStats::new(),
679 })
680 }
681
682 pub async fn open_file_with_priority(
691 self: &Arc<Self>,
692 path: &Path,
693 base_priority: u64,
694 file_size_bytes: &CachedFileSize,
695 ) -> Result<FileScheduler> {
696 let file_size_bytes = if let Some(size) = file_size_bytes.get() {
697 u64::from(size)
698 } else {
699 let size = self.object_store.size(path).await?;
700 if let Some(size) = NonZero::new(size) {
701 file_size_bytes.set(size);
702 }
703 size
704 };
705 let reader = self
706 .object_store
707 .open_with_size(path, file_size_bytes as usize)
708 .await?;
709 let block_size = self.object_store.block_size() as u64;
710 let max_iop_size = self.object_store.max_iop_size();
711 Ok(FileScheduler {
712 reader: reader.into(),
713 block_size,
714 root: self.clone(),
715 base_priority,
716 max_iop_size,
717 bypass_backpressure: false,
718 extra_stats: None,
719 })
720 }
721
722 pub async fn open_file(
726 self: &Arc<Self>,
727 path: &Path,
728 file_size_bytes: &CachedFileSize,
729 ) -> Result<FileScheduler> {
730 self.open_file_with_priority(path, 0, file_size_bytes).await
731 }
732
733 fn do_submit_request(
734 &self,
735 reader: Arc<dyn Reader>,
736 request: Vec<Range<u64>>,
737 tx: oneshot::Sender<Response>,
738 priority: u128,
739 io_queue: &Arc<IoQueue>,
740 bypass_backpressure: bool,
741 ) {
742 let num_iops = request.len() as u32;
743
744 let when_all_io_done = move |bytes_and_permits| {
745 let _ = tx.send(bytes_and_permits);
747 };
748
749 let dest = Arc::new(Mutex::new(Box::new(MutableBatch::new(
750 when_all_io_done,
751 num_iops,
752 priority,
753 request.len(),
754 bypass_backpressure,
755 ))));
756
757 for (task_idx, iop) in request.into_iter().enumerate() {
758 let dest = dest.clone();
759 let io_queue_clone = io_queue.clone();
760 let num_bytes = iop.end - iop.start;
761 let task = IoTask {
762 reader: reader.clone(),
763 to_read: iop,
764 priority,
765 bypass_backpressure,
766 when_done: Box::new(move |data| {
767 io_queue_clone.on_iop_complete();
768 let mut dest = dest.lock().unwrap();
769 let chunk = DataChunk {
770 data,
771 task_idx,
772 num_bytes,
773 };
774 dest.deliver_data(chunk);
775 }),
776 };
777 io_queue.push(task);
778 }
779 }
780
781 fn submit_request_standard(
782 &self,
783 reader: Arc<dyn Reader>,
784 request: Vec<Range<u64>>,
785 priority: u128,
786 io_queue: &Arc<IoQueue>,
787 bypass_backpressure: bool,
788 ) -> impl Future<Output = Result<Vec<Bytes>>> + Send + use<> {
789 let (tx, rx) = oneshot::channel::<Response>();
790
791 self.do_submit_request(reader, request, tx, priority, io_queue, bypass_backpressure);
792
793 let io_queue_clone = io_queue.clone();
794
795 rx.map(move |wrapped_rsp| {
796 let rsp = wrapped_rsp.unwrap();
799 io_queue_clone.on_bytes_consumed(rsp.num_bytes, rsp.priority, rsp.num_reqs);
800 rsp.data
801 })
802 }
803
804 fn submit_request_lite(
805 &self,
806 reader: Arc<dyn Reader>,
807 request: Vec<Range<u64>>,
808 priority: u128,
809 io_queue: &Arc<lite::IoQueue>,
810 bypass_backpressure: bool,
811 ) -> impl Future<Output = Result<Vec<Bytes>>> + Send + use<> {
812 let maybe_tasks = request
814 .into_iter()
815 .map(|task| {
816 let reader = reader.clone();
817 let queue = io_queue.clone();
818 let run_fn = Box::new(move || {
819 reader
820 .get_range(task.start as usize..task.end as usize)
821 .map_err(Error::from)
822 .boxed()
823 });
824 queue.submit(task, priority, run_fn, bypass_backpressure)
825 })
826 .collect::<Result<Vec<_>>>();
827 match maybe_tasks {
828 Ok(tasks) => async move {
829 let mut results = Vec::with_capacity(tasks.len());
830 for task in tasks {
831 results.push(task.await?);
832 }
833 Ok(results)
834 }
835 .boxed(),
836 Err(e) => async move { Err(e) }.boxed(),
837 }
838 }
839
840 pub fn submit_request(
841 &self,
842 reader: Arc<dyn Reader>,
843 request: Vec<Range<u64>>,
844 priority: u128,
845 bypass_backpressure: bool,
846 ) -> impl Future<Output = Result<Vec<Bytes>>> + Send + use<> {
847 match &self.io_queue {
848 IoQueueType::Standard(io_queue) => {
849 futures::future::Either::Left(self.submit_request_standard(
850 reader,
851 request,
852 priority,
853 io_queue,
854 bypass_backpressure,
855 ))
856 }
857 IoQueueType::Lite(io_queue) => futures::future::Either::Right(
858 self.submit_request_lite(reader, request, priority, io_queue, bypass_backpressure),
859 ),
860 }
861 }
862
863 pub fn stats(&self) -> ScanStats {
864 self.stats.snapshot()
865 }
866
867 #[cfg(test)]
868 fn uses_lite_scheduler(&self) -> bool {
869 matches!(self.io_queue, IoQueueType::Lite(_))
870 }
871}
872
873impl Drop for ScanScheduler {
874 fn drop(&mut self) {
875 match &self.io_queue {
887 IoQueueType::Standard(io_queue) => io_queue.close(),
888 IoQueueType::Lite(io_queue) => io_queue.close(),
889 }
890 }
891}
892
893#[derive(Clone, Debug)]
895pub struct FileScheduler {
896 reader: Arc<dyn Reader>,
897 root: Arc<ScanScheduler>,
898 block_size: u64,
899 base_priority: u64,
900 max_iop_size: u64,
901 bypass_backpressure: bool,
902 extra_stats: Option<Arc<dyn IoStatsRecorder>>,
906}
907
908fn is_close_together(range1: &Range<u64>, range2: &Range<u64>, block_size: u64) -> bool {
909 range2.start <= (range1.end + block_size)
911}
912
913fn is_overlapping(range1: &Range<u64>, range2: &Range<u64>) -> bool {
914 range1.start < range2.end && range2.start < range1.end
915}
916
917impl FileScheduler {
918 pub fn submit_request(
931 &self,
932 request: Vec<Range<u64>>,
933 priority: u64,
934 ) -> impl Future<Output = Result<Vec<Bytes>>> + Send + use<> {
935 let priority = ((self.base_priority as u128) << 64) + priority as u128;
937
938 let mut merged_requests = Vec::with_capacity(request.len());
939
940 if !request.is_empty() {
941 let mut curr_interval = request[0].clone();
942
943 for req in request.iter().skip(1) {
944 if is_close_together(&curr_interval, req, self.block_size) {
945 curr_interval.end = curr_interval.end.max(req.end);
946 } else {
947 merged_requests.push(curr_interval);
948 curr_interval = req.clone();
949 }
950 }
951
952 merged_requests.push(curr_interval);
953 }
954
955 let mut updated_requests = Vec::with_capacity(merged_requests.len());
956 for req in merged_requests {
957 if req.is_empty() {
958 updated_requests.push(req);
959 } else {
960 let num_requests = (req.end - req.start).div_ceil(self.max_iop_size);
961 let bytes_per_request = (req.end - req.start) / num_requests;
962 for i in 0..num_requests {
963 let start = req.start + i * bytes_per_request;
964 let end = if i == num_requests - 1 {
965 req.end
967 } else {
968 start + bytes_per_request
969 };
970 updated_requests.push(start..end);
971 }
972 }
973 }
974
975 self.root.stats.record_request(&updated_requests);
976 if let Some(extra_stats) = &self.extra_stats {
977 extra_stats.record_request(&updated_requests);
978 }
979
980 let bytes_vec_fut = self.root.submit_request(
981 self.reader.clone(),
982 updated_requests.clone(),
983 priority,
984 self.bypass_backpressure,
985 );
986
987 let mut updated_index = 0;
988 let mut final_bytes = Vec::with_capacity(request.len());
989
990 async move {
991 let bytes_vec = bytes_vec_fut.await?;
992
993 let mut orig_index = 0;
994 while (updated_index < updated_requests.len()) && (orig_index < request.len()) {
995 let updated_range = &updated_requests[updated_index];
996 let orig_range = &request[orig_index];
997 let byte_offset = updated_range.start as usize;
998
999 if is_overlapping(updated_range, orig_range) {
1000 let start = orig_range.start as usize - byte_offset;
1002 if orig_range.end <= updated_range.end {
1003 let end = orig_range.end as usize - byte_offset;
1006 final_bytes.push(bytes_vec[updated_index].slice(start..end));
1007 } else {
1008 let orig_size = orig_range.end - orig_range.start;
1011 let mut merged_bytes = Vec::with_capacity(orig_size as usize);
1012 merged_bytes.extend_from_slice(&bytes_vec[updated_index].slice(start..));
1013 let mut copy_offset = merged_bytes.len() as u64;
1014 while copy_offset < orig_size {
1015 updated_index += 1;
1016 let next_range = &updated_requests[updated_index];
1017 let bytes_to_take =
1018 (orig_size - copy_offset).min(next_range.end - next_range.start);
1019 merged_bytes.extend_from_slice(
1020 &bytes_vec[updated_index].slice(0..bytes_to_take as usize),
1021 );
1022 copy_offset += bytes_to_take;
1023 }
1024 final_bytes.push(Bytes::from(merged_bytes));
1025 }
1026 orig_index += 1;
1027 } else {
1028 updated_index += 1;
1029 }
1030 }
1031
1032 Ok(final_bytes)
1033 }
1034 }
1035
1036 pub fn with_priority(&self, priority: u64) -> Self {
1037 Self {
1038 reader: self.reader.clone(),
1039 root: self.root.clone(),
1040 block_size: self.block_size,
1041 max_iop_size: self.max_iop_size,
1042 base_priority: priority,
1043 bypass_backpressure: self.bypass_backpressure,
1044 extra_stats: self.extra_stats.clone(),
1045 }
1046 }
1047
1048 pub fn with_io_stats(&self, stats: Arc<dyn IoStatsRecorder>) -> Self {
1058 Self {
1059 extra_stats: Some(stats),
1060 ..self.clone()
1061 }
1062 }
1063
1064 pub fn with_bypass_backpressure(&self) -> Self {
1069 Self {
1070 bypass_backpressure: true,
1071 ..self.clone()
1072 }
1073 }
1074
1075 pub fn submit_single(
1082 &self,
1083 range: Range<u64>,
1084 priority: u64,
1085 ) -> impl Future<Output = Result<Bytes>> + Send {
1086 self.submit_request(vec![range], priority)
1087 .map_ok(|vec_bytes| vec_bytes.into_iter().next().unwrap())
1088 }
1089
1090 pub fn reader(&self) -> &Arc<dyn Reader> {
1096 &self.reader
1097 }
1098}
1099
1100#[cfg(test)]
1101mod tests {
1102 use std::{collections::VecDeque, time::Duration};
1103
1104 use futures::poll;
1105 use lance_core::utils::tempfile::TempObjFile;
1106 use rand::RngCore;
1107
1108 use object_store::{GetRange, ObjectStore as OSObjectStore, ObjectStoreExt, memory::InMemory};
1109 use tokio::{runtime::Handle, time::timeout};
1110 use url::Url;
1111
1112 use crate::{
1113 object_store::{DEFAULT_DOWNLOAD_RETRY_COUNT, DEFAULT_MAX_IOP_SIZE},
1114 testing::MockObjectStore,
1115 };
1116
1117 use super::*;
1118
1119 fn make_task(priority: u128, bypass_backpressure: bool) -> IoTask {
1120 IoTask {
1121 reader: Arc::new(TrackingReader {
1122 get_range_count: Arc::new(AtomicU64::new(0)),
1123 path: Path::parse("test").unwrap(),
1124 }),
1125 to_read: 0..1,
1126 when_done: Box::new(|_| {}),
1127 priority,
1128 bypass_backpressure,
1129 }
1130 }
1131
1132 #[test]
1133 fn test_iotask_ordering() {
1134 let mut heap = BinaryHeap::new();
1137 heap.push(make_task(10, false)); heap.push(make_task(1, false)); heap.push(make_task(20, true)); heap.push(make_task(5, true)); let order: Vec<(u128, bool)> = std::iter::from_fn(|| heap.pop())
1143 .map(|t| (t.priority, t.bypass_backpressure))
1144 .collect();
1145
1146 assert_eq!(order, vec![(5, true), (20, true), (1, false), (10, false)]);
1147 }
1148
1149 #[tokio::test]
1150 async fn test_full_seq_read() {
1151 let tmp_file = TempObjFile::default();
1152
1153 let obj_store = Arc::new(ObjectStore::local());
1154
1155 const DATA_SIZE: u64 = 1024 * 1024;
1157 let mut some_data = vec![0; DATA_SIZE as usize];
1158 rand::rng().fill_bytes(&mut some_data);
1159 obj_store.put(&tmp_file, &some_data).await.unwrap();
1160
1161 let config = SchedulerConfig::default_for_testing();
1162
1163 let scheduler = ScanScheduler::new(obj_store, config);
1164
1165 let file_scheduler = scheduler
1166 .open_file(&tmp_file, &CachedFileSize::unknown())
1167 .await
1168 .unwrap();
1169
1170 const READ_SIZE: u64 = 4 * 1024;
1172 let mut reqs = VecDeque::new();
1173 let mut offset = 0;
1174 while offset < DATA_SIZE {
1175 reqs.push_back(
1176 #[allow(clippy::single_range_in_vec_init)]
1177 file_scheduler
1178 .submit_request(vec![offset..offset + READ_SIZE], 0)
1179 .await
1180 .unwrap(),
1181 );
1182 offset += READ_SIZE;
1183 }
1184
1185 offset = 0;
1186 while offset < DATA_SIZE {
1188 let data = reqs.pop_front().unwrap();
1189 let actual = &data[0];
1190 let expected = &some_data[offset as usize..(offset + READ_SIZE) as usize];
1191 assert_eq!(expected, actual);
1192 offset += READ_SIZE;
1193 }
1194 }
1195
1196 #[tokio::test]
1197 async fn test_split_coalesce() {
1198 let tmp_file = TempObjFile::default();
1199
1200 let obj_store = Arc::new(ObjectStore::local());
1201
1202 const DATA_SIZE: u64 = 75 * 1024 * 1024;
1204 let mut some_data = vec![0; DATA_SIZE as usize];
1205 rand::rng().fill_bytes(&mut some_data);
1206 obj_store.put(&tmp_file, &some_data).await.unwrap();
1207
1208 let config = SchedulerConfig::default_for_testing();
1209
1210 let scheduler = ScanScheduler::new(obj_store, config);
1211
1212 let file_scheduler = scheduler
1213 .open_file(&tmp_file, &CachedFileSize::unknown())
1214 .await
1215 .unwrap();
1216
1217 let req =
1220 file_scheduler.submit_request(vec![50_000..51_000, 52_000..53_000, 54_000..55_000], 0);
1221
1222 let bytes = req.await.unwrap();
1223
1224 assert_eq!(bytes[0], &some_data[50_000..51_000]);
1225 assert_eq!(bytes[1], &some_data[52_000..53_000]);
1226 assert_eq!(bytes[2], &some_data[54_000..55_000]);
1227
1228 assert_eq!(1, scheduler.stats().iops);
1229
1230 let req = file_scheduler.submit_request(vec![0..DATA_SIZE], 0);
1232 let bytes = req.await.unwrap();
1233 assert!(bytes[0] == some_data, "data is not the same");
1234
1235 assert_eq!(6, scheduler.stats().iops);
1236
1237 let chunk_size = *DEFAULT_MAX_IOP_SIZE;
1241 let req = file_scheduler.submit_request(
1242 vec![
1243 10..chunk_size,
1244 chunk_size + 10..(chunk_size * 2) - 20,
1245 chunk_size * 2..(chunk_size * 2) + 10,
1246 ],
1247 0,
1248 );
1249
1250 let bytes = req.await.unwrap();
1251 let chunk_size = chunk_size as usize;
1252 assert!(
1253 bytes[0] == some_data[10..chunk_size],
1254 "data is not the same"
1255 );
1256 assert!(
1257 bytes[1] == some_data[chunk_size + 10..(chunk_size * 2) - 20],
1258 "data is not the same"
1259 );
1260 assert!(
1261 bytes[2] == some_data[chunk_size * 2..(chunk_size * 2) + 10],
1262 "data is not the same"
1263 );
1264 assert_eq!(8, scheduler.stats().iops);
1265
1266 let reads = (0..44)
1267 .map(|i| i * 1_000_000..(i + 1) * 1_000_000)
1268 .collect::<Vec<_>>();
1269 let req = file_scheduler.submit_request(reads, 0);
1270 let bytes = req.await.unwrap();
1271 for (i, bytes) in bytes.iter().enumerate() {
1272 assert!(
1273 bytes == &some_data[i * 1_000_000..(i + 1) * 1_000_000],
1274 "data is not the same"
1275 );
1276 }
1277 assert_eq!(11, scheduler.stats().iops);
1278 }
1279
1280 #[tokio::test]
1281 async fn test_io_stats_sink() {
1282 let tmp_file = TempObjFile::default();
1283 let obj_store = Arc::new(ObjectStore::local());
1284
1285 const DATA_SIZE: u64 = 1024 * 1024;
1286 let mut some_data = vec![0; DATA_SIZE as usize];
1287 rand::rng().fill_bytes(&mut some_data);
1288 obj_store.put(&tmp_file, &some_data).await.unwrap();
1289
1290 let scheduler = ScanScheduler::new(obj_store, SchedulerConfig::default_for_testing());
1291
1292 let sink = IoStats::new();
1294 let file_scheduler = scheduler
1295 .open_file(&tmp_file, &CachedFileSize::unknown())
1296 .await
1297 .unwrap()
1298 .with_io_stats(sink.recorder());
1299
1300 file_scheduler
1304 .submit_request(vec![50_000..51_000, 52_000..53_000, 54_000..55_000], 0)
1305 .await
1306 .unwrap();
1307
1308 let global = scheduler.stats();
1309 let scoped = sink.snapshot();
1310 assert_eq!(1, scoped.iops);
1311 assert_eq!(1, scoped.requests);
1312 assert_eq!(5000, scoped.bytes_read);
1314 assert_eq!(global.iops, scoped.iops);
1315 assert_eq!(global.requests, scoped.requests);
1316 assert_eq!(global.bytes_read, scoped.bytes_read);
1317
1318 let other = scheduler
1321 .open_file(&tmp_file, &CachedFileSize::unknown())
1322 .await
1323 .unwrap();
1324 other.submit_request(vec![0..1000], 0).await.unwrap();
1325
1326 let global_after = scheduler.stats();
1327 let scoped_after = sink.snapshot();
1328 assert_eq!(global.bytes_read + 1000, global_after.bytes_read);
1329 assert_eq!(scoped.bytes_read, scoped_after.bytes_read);
1330 assert_eq!(scoped.iops, scoped_after.iops);
1331 }
1332
1333 #[tokio::test]
1334 async fn test_priority() {
1335 let some_path = Path::parse("foo").unwrap();
1336 let base_store = Arc::new(InMemory::new());
1337 base_store
1338 .put(&some_path, vec![0; 1000].into())
1339 .await
1340 .unwrap();
1341
1342 let semaphore = Arc::new(tokio::sync::Semaphore::new(0));
1343 let mut obj_store = MockObjectStore::default();
1344 let semaphore_copy = semaphore.clone();
1345 obj_store
1346 .expect_get_opts()
1347 .returning(move |location, options| {
1348 let semaphore = semaphore.clone();
1349 let base_store = base_store.clone();
1350 let location = location.clone();
1351 async move {
1352 semaphore.acquire().await.unwrap().forget();
1353 base_store.get_opts(&location, options).await
1354 }
1355 .boxed()
1356 });
1357 let obj_store = Arc::new(ObjectStore::new(
1358 Arc::new(obj_store),
1359 Url::parse("mem://").unwrap(),
1360 Some(500),
1361 None,
1362 false,
1363 false,
1364 1,
1365 DEFAULT_DOWNLOAD_RETRY_COUNT,
1366 None,
1367 ));
1368
1369 let config = SchedulerConfig {
1370 io_buffer_size_bytes: 1024 * 1024,
1371 use_lite_scheduler: None,
1372 };
1373
1374 let scan_scheduler = ScanScheduler::new(obj_store, config);
1375
1376 let file_scheduler = scan_scheduler
1377 .open_file(&Path::parse("foo").unwrap(), &CachedFileSize::new(1000))
1378 .await
1379 .unwrap();
1380
1381 let first_fut = timeout(
1385 Duration::from_secs(10),
1386 file_scheduler.submit_single(0..10, 0),
1387 )
1388 .boxed();
1389
1390 let mut second_fut = timeout(
1392 Duration::from_secs(10),
1393 file_scheduler.submit_single(0..20, 100),
1394 )
1395 .boxed();
1396
1397 let mut third_fut = timeout(
1400 Duration::from_secs(10),
1401 file_scheduler.submit_single(0..30, 0),
1402 )
1403 .boxed();
1404
1405 semaphore_copy.add_permits(1);
1407 assert!(first_fut.await.unwrap().unwrap().len() == 10);
1408 assert!(poll!(&mut second_fut).is_pending());
1410 assert!(poll!(&mut third_fut).is_pending());
1411
1412 semaphore_copy.add_permits(1);
1414 assert!(third_fut.await.unwrap().unwrap().len() == 30);
1415 assert!(poll!(&mut second_fut).is_pending());
1416
1417 semaphore_copy.add_permits(1);
1419 assert!(second_fut.await.unwrap().unwrap().len() == 20);
1420 }
1421
1422 #[tokio::test(flavor = "multi_thread")]
1423 async fn test_backpressure() {
1424 let some_path = Path::parse("foo").unwrap();
1425 let base_store = Arc::new(InMemory::new());
1426 base_store
1427 .put(&some_path, vec![0; 100000].into())
1428 .await
1429 .unwrap();
1430
1431 let bytes_read = Arc::new(AtomicU64::from(0));
1432 let mut obj_store = MockObjectStore::default();
1433 let bytes_read_copy = bytes_read.clone();
1434 obj_store
1436 .expect_get_opts()
1437 .returning(move |location, options| {
1438 let range = options.range.as_ref().unwrap();
1439 let num_bytes = match range {
1440 GetRange::Bounded(bounded) => bounded.end - bounded.start,
1441 _ => panic!(),
1442 };
1443 bytes_read_copy.fetch_add(num_bytes, Ordering::Release);
1444 let location = location.clone();
1445 let base_store = base_store.clone();
1446 async move { base_store.get_opts(&location, options).await }.boxed()
1447 });
1448 let obj_store = Arc::new(ObjectStore::new(
1449 Arc::new(obj_store),
1450 Url::parse("mem://").unwrap(),
1451 Some(500),
1452 None,
1453 false,
1454 false,
1455 1,
1456 DEFAULT_DOWNLOAD_RETRY_COUNT,
1457 None,
1458 ));
1459
1460 let config = SchedulerConfig {
1461 io_buffer_size_bytes: 10,
1462 use_lite_scheduler: None,
1463 };
1464
1465 let scan_scheduler = ScanScheduler::new(obj_store.clone(), config);
1466
1467 let file_scheduler = scan_scheduler
1468 .open_file(&Path::parse("foo").unwrap(), &CachedFileSize::new(100000))
1469 .await
1470 .unwrap();
1471
1472 let wait_for_idle = || async move {
1473 let handle = Handle::current();
1474 while handle.metrics().num_alive_tasks() != 1 {
1475 tokio::time::sleep(Duration::from_millis(10)).await;
1476 }
1477 };
1478 let wait_for_bytes_read_and_idle = |target_bytes: u64| {
1479 let bytes_read = &bytes_read;
1481 async move {
1482 let bytes_read_copy = bytes_read.clone();
1483 while bytes_read_copy.load(Ordering::Acquire) < target_bytes {
1484 tokio::time::sleep(Duration::from_millis(10)).await;
1485 }
1486 wait_for_idle().await;
1487 }
1488 };
1489
1490 let first_fut = file_scheduler.submit_single(0..5, 0);
1492 let second_fut = file_scheduler.submit_single(0..5, 0);
1494 let third_fut = file_scheduler.submit_single(0..3, 0);
1496 wait_for_bytes_read_and_idle(10).await;
1498
1499 assert_eq!(first_fut.await.unwrap().len(), 5);
1500 wait_for_bytes_read_and_idle(13).await;
1502
1503 let fourth_fut = file_scheduler.submit_single(0..5, 0);
1505 wait_for_bytes_read_and_idle(13).await;
1506
1507 assert_eq!(third_fut.await.unwrap().len(), 3);
1509 wait_for_bytes_read_and_idle(18).await;
1510
1511 assert_eq!(second_fut.await.unwrap().len(), 5);
1512 let fifth_fut = file_scheduler.submit_request(vec![0..3, 90000..90007], 0);
1519 wait_for_bytes_read_and_idle(21).await;
1520
1521 let fifth_bytes = tokio::time::timeout(Duration::from_secs(10), fifth_fut)
1523 .await
1524 .unwrap();
1525 assert_eq!(
1526 fifth_bytes.unwrap().iter().map(|b| b.len()).sum::<usize>(),
1527 10
1528 );
1529
1530 assert_eq!(fourth_fut.await.unwrap().len(), 5);
1532 wait_for_bytes_read_and_idle(28).await;
1533
1534 let config = SchedulerConfig {
1536 io_buffer_size_bytes: 10,
1537 use_lite_scheduler: None,
1538 };
1539
1540 let scan_scheduler = ScanScheduler::new(obj_store, config);
1541 let file_scheduler = scan_scheduler
1542 .open_file(&Path::parse("foo").unwrap(), &CachedFileSize::new(100000))
1543 .await
1544 .unwrap();
1545
1546 let first_fut = file_scheduler.submit_single(0..10, 0);
1547 let second_fut = file_scheduler.submit_single(0..10, 0);
1548
1549 std::thread::sleep(Duration::from_millis(100));
1550 assert_eq!(first_fut.await.unwrap().len(), 10);
1551 assert_eq!(second_fut.await.unwrap().len(), 10);
1552 }
1553
1554 #[derive(Debug)]
1556 struct TrackingReader {
1557 get_range_count: Arc<AtomicU64>,
1558 path: Path,
1559 }
1560
1561 impl lance_core::deepsize::DeepSizeOf for TrackingReader {
1562 fn deep_size_of_children(&self, _context: &mut lance_core::deepsize::Context) -> usize {
1563 0
1564 }
1565 }
1566
1567 impl Reader for TrackingReader {
1568 fn path(&self) -> &Path {
1569 &self.path
1570 }
1571
1572 fn block_size(&self) -> usize {
1573 4096
1574 }
1575
1576 fn io_parallelism(&self) -> usize {
1577 1
1578 }
1579
1580 fn size(&self) -> futures::future::BoxFuture<'_, object_store::Result<usize>> {
1581 Box::pin(async { Ok(1_000_000) })
1582 }
1583
1584 fn get_range(
1585 &self,
1586 range: Range<usize>,
1587 ) -> futures::future::BoxFuture<'static, object_store::Result<Bytes>> {
1588 self.get_range_count.fetch_add(1, Ordering::Release);
1589 let num_bytes = range.end - range.start;
1590 Box::pin(async move { Ok(Bytes::from(vec![0u8; num_bytes])) })
1591 }
1592
1593 fn get_all(&self) -> futures::future::BoxFuture<'_, object_store::Result<Bytes>> {
1594 Box::pin(async { Ok(Bytes::from(vec![0u8; 1_000_000])) })
1595 }
1596 }
1597
1598 #[tokio::test]
1599 async fn test_lite_scheduler_submits_eagerly() {
1600 let obj_store = Arc::new(ObjectStore::memory());
1601 let config = SchedulerConfig::default_for_testing().with_lite_scheduler();
1602 let scheduler = ScanScheduler::new(obj_store, config);
1603
1604 let get_range_count = Arc::new(AtomicU64::new(0));
1605 let reader: Arc<dyn Reader> = Arc::new(TrackingReader {
1606 get_range_count: get_range_count.clone(),
1607 path: Path::parse("test").unwrap(),
1608 });
1609
1610 let fut1 = scheduler.submit_request(reader.clone(), vec![0..100], 0, false);
1613 let fut2 = scheduler.submit_request(reader.clone(), vec![100..200], 10, false);
1614 let fut3 = scheduler.submit_request(reader.clone(), vec![200..300], 20, false);
1615
1616 assert_eq!(get_range_count.load(Ordering::Acquire), 3);
1618
1619 assert_eq!(fut1.await.unwrap()[0].len(), 100);
1621 assert_eq!(fut2.await.unwrap()[0].len(), 100);
1622 assert_eq!(fut3.await.unwrap()[0].len(), 100);
1623 }
1624
1625 #[tokio::test]
1626 async fn test_object_store_selects_scheduler() {
1627 let memory_store = Arc::new(ObjectStore::memory());
1629 assert!(!memory_store.prefers_lite_scheduler());
1630 let config = SchedulerConfig {
1631 io_buffer_size_bytes: 256 * 1024 * 1024,
1632 use_lite_scheduler: None,
1633 };
1634 let scheduler = ScanScheduler::new(memory_store.clone(), config);
1635 assert!(!scheduler.uses_lite_scheduler());
1636
1637 let uring_store = Arc::new(ObjectStore::new(
1639 Arc::new(InMemory::new()),
1640 Url::parse("file+uring:///tmp").unwrap(),
1641 None,
1642 None,
1643 false,
1644 false,
1645 8,
1646 DEFAULT_DOWNLOAD_RETRY_COUNT,
1647 None,
1648 ));
1649 assert!(uring_store.prefers_lite_scheduler());
1650 let config = SchedulerConfig {
1651 io_buffer_size_bytes: 256 * 1024 * 1024,
1652 use_lite_scheduler: None,
1653 };
1654 let scheduler = ScanScheduler::new(uring_store.clone(), config);
1655 assert!(scheduler.uses_lite_scheduler());
1656
1657 let config = SchedulerConfig {
1659 io_buffer_size_bytes: 256 * 1024 * 1024,
1660 use_lite_scheduler: Some(false),
1661 };
1662 let scheduler = ScanScheduler::new(uring_store, config);
1663 assert!(!scheduler.uses_lite_scheduler());
1664
1665 let config = SchedulerConfig {
1667 io_buffer_size_bytes: 256 * 1024 * 1024,
1668 use_lite_scheduler: Some(true),
1669 };
1670 let scheduler = ScanScheduler::new(memory_store, config);
1671 assert!(scheduler.uses_lite_scheduler());
1672 }
1673
1674 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1675 async fn stress_backpressure() {
1676 let some_path = Path::parse("foo").unwrap();
1680 let obj_store = Arc::new(ObjectStore::memory());
1681 obj_store
1682 .put(&some_path, vec![0; 100000].as_slice())
1683 .await
1684 .unwrap();
1685
1686 let config = SchedulerConfig {
1688 io_buffer_size_bytes: 1,
1689 use_lite_scheduler: None,
1690 };
1691 let scan_scheduler = ScanScheduler::new(obj_store.clone(), config);
1692 let file_scheduler = scan_scheduler
1693 .open_file(&some_path, &CachedFileSize::unknown())
1694 .await
1695 .unwrap();
1696
1697 let mut futs = Vec::with_capacity(10000);
1698 for idx in 0..10000 {
1699 futs.push(file_scheduler.submit_single(idx..idx + 1, idx));
1700 }
1701
1702 for fut in futs {
1703 fut.await.unwrap();
1704 }
1705 }
1706
1707 #[tokio::test(flavor = "multi_thread")]
1708 async fn test_zero_buffer_size_no_backpressure() {
1709 let obj_store = Arc::new(ObjectStore::memory());
1712 let config = SchedulerConfig {
1713 io_buffer_size_bytes: 0,
1714 use_lite_scheduler: Some(false),
1715 };
1716 let scheduler = ScanScheduler::new(obj_store, config);
1717
1718 let get_range_count = Arc::new(AtomicU64::new(0));
1719 let reader: Arc<dyn Reader> = Arc::new(TrackingReader {
1720 get_range_count: get_range_count.clone(),
1721 path: Path::parse("test").unwrap(),
1722 });
1723
1724 let fut1 = scheduler.submit_request(reader.clone(), vec![0..1000], 0, false);
1727 let fut2 = scheduler.submit_request(reader.clone(), vec![1000..2000], 1, false);
1728 let fut3 = scheduler.submit_request(reader.clone(), vec![2000..3000], 2, false);
1729
1730 let bytes1 = timeout(Duration::from_secs(5), fut1)
1731 .await
1732 .unwrap()
1733 .unwrap();
1734 let bytes2 = timeout(Duration::from_secs(5), fut2)
1735 .await
1736 .unwrap()
1737 .unwrap();
1738 let bytes3 = timeout(Duration::from_secs(5), fut3)
1739 .await
1740 .unwrap()
1741 .unwrap();
1742 assert_eq!(bytes1[0].len(), 1000);
1743 assert_eq!(bytes2[0].len(), 1000);
1744 assert_eq!(bytes3[0].len(), 1000);
1745 assert_eq!(get_range_count.load(Ordering::Acquire), 3);
1746 }
1747
1748 #[tokio::test(flavor = "multi_thread")]
1749 async fn test_file_scheduler_bypass_backpressure() {
1750 let some_path = Path::parse("foo").unwrap();
1753 let base_store = Arc::new(InMemory::new());
1754 base_store
1755 .put(&some_path, vec![0u8; 1000].into())
1756 .await
1757 .unwrap();
1758
1759 let bytes_dispatched = Arc::new(AtomicU64::from(0));
1760 let mut obj_store = MockObjectStore::default();
1761 let bytes_dispatched_copy = bytes_dispatched.clone();
1762 obj_store
1763 .expect_get_opts()
1764 .returning(move |location, options| {
1765 let range = options.range.as_ref().unwrap();
1766 let num_bytes = match range {
1767 GetRange::Bounded(bounded) => bounded.end - bounded.start,
1768 _ => panic!(),
1769 };
1770 bytes_dispatched_copy.fetch_add(num_bytes, Ordering::Release);
1771 let location = location.clone();
1772 let base_store = base_store.clone();
1773 async move { base_store.get_opts(&location, options).await }.boxed()
1774 });
1775 let obj_store = Arc::new(ObjectStore::new(
1776 Arc::new(obj_store),
1777 Url::parse("mem://").unwrap(),
1778 Some(500),
1779 None,
1780 false,
1781 false,
1782 1,
1783 DEFAULT_DOWNLOAD_RETRY_COUNT,
1784 None,
1785 ));
1786
1787 let config = SchedulerConfig {
1789 io_buffer_size_bytes: 10,
1790 use_lite_scheduler: Some(false),
1791 };
1792 let scan_scheduler = ScanScheduler::new(obj_store, config);
1793 let file_scheduler = scan_scheduler
1794 .open_file(&Path::parse("foo").unwrap(), &CachedFileSize::new(1000))
1795 .await
1796 .unwrap();
1797 let bypass_scheduler = file_scheduler.with_bypass_backpressure();
1798
1799 let blocker_fut = file_scheduler.submit_single(0..10, 0);
1801 while bytes_dispatched.load(Ordering::Acquire) < 10 {
1802 tokio::time::sleep(Duration::from_millis(1)).await;
1803 }
1804
1805 let normal_fut = file_scheduler.submit_single(0..10, 2);
1808 let bypass_fut = bypass_scheduler.submit_single(0..10, 1);
1809
1810 while bytes_dispatched.load(Ordering::Acquire) < 20 {
1812 tokio::time::sleep(Duration::from_millis(1)).await;
1813 }
1814 tokio::time::sleep(Duration::from_millis(20)).await;
1815 assert_eq!(
1816 bytes_dispatched.load(Ordering::Acquire),
1817 20,
1818 "normal read should still be blocked while budget is exhausted"
1819 );
1820
1821 timeout(Duration::from_secs(5), blocker_fut)
1823 .await
1824 .unwrap()
1825 .unwrap();
1826 timeout(Duration::from_secs(5), bypass_fut)
1827 .await
1828 .unwrap()
1829 .unwrap();
1830 timeout(Duration::from_secs(5), normal_fut)
1831 .await
1832 .unwrap()
1833 .unwrap();
1834 assert_eq!(bytes_dispatched.load(Ordering::Acquire), 30);
1835 }
1836}