Skip to main content

lance_io/
scheduler.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use bytes::Bytes;
5use futures::channel::oneshot;
6use futures::{FutureExt, TryFutureExt};
7use object_store::path::Path;
8use std::collections::BinaryHeap;
9use std::fmt::Debug;
10use std::future::Future;
11use std::num::NonZero;
12use std::ops::Range;
13use std::sync::atomic::{AtomicU64, Ordering};
14use std::sync::{Arc, Mutex};
15use std::time::Instant;
16use tokio::sync::{Notify, Semaphore, SemaphorePermit};
17
18use lance_core::{Error, Result};
19
20use crate::object_store::ObjectStore;
21use crate::traits::Reader;
22use crate::utils::CachedFileSize;
23
24mod lite;
25
26// Don't log backpressure warnings until at least this many seconds have passed
27const BACKPRESSURE_MIN: u64 = 5;
28// Don't log backpressure warnings more than once / minute
29const BACKPRESSURE_DEBOUNCE: u64 = 60;
30
31// Global counter of how many IOPS we have issued
32static IOPS_COUNTER: AtomicU64 = AtomicU64::new(0);
33// Global counter of how many bytes were read by the scheduler
34static BYTES_READ_COUNTER: AtomicU64 = AtomicU64::new(0);
35// By default, we limit the number of IOPS across the entire process to 128
36//
37// In theory this is enough for ~10GBps on S3 following the guidelines to issue
38// 1 IOP per 80MBps.  In practice, I have noticed slightly better performance going
39// up to 256.
40//
41// However, non-S3 stores (e.g. GCS, Azure) can suffer significantly from too many
42// concurrent IOPS.  For safety, we set the default to 128 and let the user override
43// this if needed.
44//
45// Note: this only limits things that run through the scheduler.  It does not limit
46// IOPS from other sources like writing or commits.
47static DEFAULT_PROCESS_IOPS_LIMIT: i32 = 128;
48
49pub fn iops_counter() -> u64 {
50    IOPS_COUNTER.load(Ordering::Acquire)
51}
52
53pub fn bytes_read_counter() -> u64 {
54    BYTES_READ_COUNTER.load(Ordering::Acquire)
55}
56
57// There are two structures that control the I/O scheduler concurrency.  First,
58// we have a hard limit on the number of IOPS that can be issued concurrently.
59// This limit is process-wide.
60//
61// Second, we try and limit how many I/O requests can be buffered in memory without
62// being consumed by a decoder of some kind.  This limit is per-scheduler.  We cannot
63// make this limit process wide without introducing deadlock (because the decoder for
64// file 0 might be waiting on IOPS blocked by a queue filled with requests for file 1)
65// and vice-versa.
66//
67// There is also a per-scan limit on the number of IOPS that can be issued concurrently.
68//
69// The process-wide limit exists when users need a hard limit on the number of parallel
70// IOPS, e.g. due to port availability limits or to prevent multiple scans from saturating
71// the network.  (Note: a process-wide limit of X will not necessarily limit the number of
72// open TCP connections to exactly X.  The underlying object store may open more connections
73// anyways)
74//
75// However, it can be too tough in some cases, e.g. when some scans are reading from
76// cloud storage and other scans are reading from local disk.  In these cases users don't
77// need to set a process-limit and can rely on the per-scan limits.
78
79// The IopsQuota enforces the first of the above limits, it is the per-process hard cap
80// on the number of IOPS that can be issued concurrently.
81//
82// The per-scan limits are enforced by IoQueue
83struct IopsQuota {
84    // An Option is used here to avoid mutex overhead if no limit is set
85    iops_avail: Option<Semaphore>,
86}
87
88/// A reservation on the global IOPS quota
89///
90/// When the reservation is dropped, the IOPS quota is released unless
91/// [`Self::forget`] is called.
92struct IopsReservation<'a> {
93    value: Option<SemaphorePermit<'a>>,
94}
95
96impl IopsReservation<'_> {
97    // Forget the reservation, so it won't be released on drop
98    fn forget(&mut self) {
99        if let Some(value) = self.value.take() {
100            value.forget();
101        }
102    }
103}
104
105impl IopsQuota {
106    // By default, we throttle the number of scan IOPS across the entire process
107    //
108    // However, the user can disable this by setting the environment variable
109    // LANCE_PROCESS_IO_THREADS_LIMIT to zero (or a negative integer).
110    fn new() -> Self {
111        let initial_capacity = std::env::var("LANCE_PROCESS_IO_THREADS_LIMIT")
112            .map(|s| {
113                s.parse::<i32>().unwrap_or_else(|_| {
114                    log::warn!("Ignoring invalid LANCE_PROCESS_IO_THREADS_LIMIT: {}", s);
115                    DEFAULT_PROCESS_IOPS_LIMIT
116                })
117            })
118            .unwrap_or(DEFAULT_PROCESS_IOPS_LIMIT);
119        let iops_avail = if initial_capacity <= 0 {
120            None
121        } else {
122            Some(Semaphore::new(initial_capacity as usize))
123        };
124        Self { iops_avail }
125    }
126
127    // Return a reservation on the global IOPS quota
128    fn release(&self) {
129        if let Some(iops_avail) = self.iops_avail.as_ref() {
130            iops_avail.add_permits(1);
131        }
132    }
133
134    // Acquire a reservation on the global IOPS quota
135    async fn acquire(&self) -> IopsReservation<'_> {
136        if let Some(iops_avail) = self.iops_avail.as_ref() {
137            IopsReservation {
138                value: Some(iops_avail.acquire().await.unwrap()),
139            }
140        } else {
141            IopsReservation { value: None }
142        }
143    }
144}
145
146static IOPS_QUOTA: std::sync::LazyLock<IopsQuota> = std::sync::LazyLock::new(IopsQuota::new);
147
148// We want to allow requests that have a lower priority than any
149// currently in-flight request.  This helps avoid potential deadlocks
150// related to backpressure.  Unfortunately, it is quite expensive to
151// keep track of which priorities are in-flight.
152//
153// TODO: At some point it would be nice if we can optimize this away but
154// in_flight should remain relatively small (generally less than 256 items)
155// and has not shown itself to be a bottleneck yet.
156struct PrioritiesInFlight {
157    in_flight: Vec<u128>,
158}
159
160impl PrioritiesInFlight {
161    fn new(capacity: u32) -> Self {
162        Self {
163            in_flight: Vec::with_capacity(capacity as usize * 2),
164        }
165    }
166
167    fn min_in_flight(&self) -> u128 {
168        self.in_flight.first().copied().unwrap_or(u128::MAX)
169    }
170
171    fn push(&mut self, prio: u128) {
172        let pos = match self.in_flight.binary_search(&prio) {
173            Ok(pos) => pos,
174            Err(pos) => pos,
175        };
176        self.in_flight.insert(pos, prio);
177    }
178
179    fn remove(&mut self, prio: u128) {
180        if let Ok(pos) = self.in_flight.binary_search(&prio) {
181            self.in_flight.remove(pos);
182        }
183    }
184}
185
186struct IoQueueState {
187    // Number of IOPS we can issue concurrently before pausing I/O
188    iops_avail: u32,
189    // Number of bytes we are allowed to buffer in memory before pausing I/O
190    //
191    // This can dip below 0 due to I/O prioritization
192    bytes_avail: i64,
193    // Pending I/O requests
194    pending_requests: BinaryHeap<IoTask>,
195    // Priorities of in-flight requests
196    priorities_in_flight: PrioritiesInFlight,
197    // Set when the scheduler is finished to notify the I/O loop to shut down
198    // once all outstanding requests have been completed.
199    done_scheduling: bool,
200    // Time when the scheduler started
201    start: Instant,
202    // Last time we warned about backpressure
203    last_warn: AtomicU64,
204}
205
206impl IoQueueState {
207    fn new(io_capacity: u32, io_buffer_size: u64) -> Self {
208        Self {
209            iops_avail: io_capacity,
210            bytes_avail: io_buffer_size as i64,
211            pending_requests: BinaryHeap::new(),
212            priorities_in_flight: PrioritiesInFlight::new(io_capacity),
213            done_scheduling: false,
214            start: Instant::now(),
215            last_warn: AtomicU64::from(0),
216        }
217    }
218
219    fn warn_if_needed(&self) {
220        let seconds_elapsed = self.start.elapsed().as_secs();
221        let last_warn = self.last_warn.load(Ordering::Acquire);
222        let since_last_warn = seconds_elapsed - last_warn;
223        if (last_warn == 0
224            && seconds_elapsed > BACKPRESSURE_MIN
225            && seconds_elapsed < BACKPRESSURE_DEBOUNCE)
226            || since_last_warn > BACKPRESSURE_DEBOUNCE
227        {
228            tracing::event!(tracing::Level::DEBUG, "Backpressure throttle exceeded");
229            log::debug!(
230                "Backpressure throttle is full, I/O will pause until buffer is drained.  Max I/O bandwidth will not be achieved because CPU is falling behind"
231            );
232            self.last_warn
233                .store(seconds_elapsed.max(1), Ordering::Release);
234        }
235    }
236
237    fn can_deliver(&self, task: &IoTask) -> bool {
238        if self.iops_avail == 0 {
239            false
240        } else if task.priority <= self.priorities_in_flight.min_in_flight() {
241            true
242        } else if task.num_bytes() as i64 > self.bytes_avail {
243            self.warn_if_needed();
244            false
245        } else {
246            true
247        }
248    }
249
250    fn next_task(&mut self) -> Option<IoTask> {
251        let task = self.pending_requests.peek()?;
252        if self.can_deliver(task) {
253            self.priorities_in_flight.push(task.priority);
254            self.iops_avail -= 1;
255            self.bytes_avail -= task.num_bytes() as i64;
256            if self.bytes_avail < 0 {
257                // This can happen when we admit special priority requests
258                log::debug!(
259                    "Backpressure throttle temporarily exceeded by {} bytes due to priority I/O",
260                    -self.bytes_avail
261                );
262            }
263            Some(self.pending_requests.pop().unwrap())
264        } else {
265            None
266        }
267    }
268}
269
270// This is modeled after the MPSC queue described here: https://docs.rs/tokio/latest/tokio/sync/struct.Notify.html
271//
272// However, it only needs to be SPSC since there is only one "scheduler thread"
273// and one I/O loop.
274struct IoQueue {
275    // Queue state
276    state: Mutex<IoQueueState>,
277    // Used to signal new I/O requests have arrived that might potentially be runnable
278    notify: Notify,
279}
280
281impl IoQueue {
282    fn new(io_capacity: u32, io_buffer_size: u64) -> Self {
283        Self {
284            state: Mutex::new(IoQueueState::new(io_capacity, io_buffer_size)),
285            notify: Notify::new(),
286        }
287    }
288
289    fn push(&self, task: IoTask) {
290        log::trace!(
291            "Inserting I/O request for {} bytes with priority ({},{}) into I/O queue",
292            task.num_bytes(),
293            task.priority >> 64,
294            task.priority & 0xFFFFFFFFFFFFFFFF
295        );
296        let mut state = self.state.lock().unwrap();
297        state.pending_requests.push(task);
298        drop(state);
299
300        self.notify.notify_one();
301    }
302
303    async fn pop(&self) -> Option<IoTask> {
304        loop {
305            {
306                // First, grab a reservation on the global IOPS quota
307                // If we then get a task to run, transfer the reservation
308                // to the task.  Otherwise, the reservation will be released
309                // when iop_res is dropped.
310                let mut iop_res = IOPS_QUOTA.acquire().await;
311                // Next, try and grab a reservation from the queue
312                let mut state = self.state.lock().unwrap();
313                if let Some(task) = state.next_task() {
314                    // Reservation successfully acquired, we will release the global
315                    // global reservation after task has run.
316                    iop_res.forget();
317                    return Some(task);
318                }
319
320                if state.done_scheduling {
321                    return None;
322                }
323            }
324
325            self.notify.notified().await;
326        }
327    }
328
329    fn on_iop_complete(&self) {
330        let mut state = self.state.lock().unwrap();
331        state.iops_avail += 1;
332        drop(state);
333
334        self.notify.notify_one();
335    }
336
337    fn on_bytes_consumed(&self, bytes: u64, priority: u128, num_reqs: usize) {
338        let mut state = self.state.lock().unwrap();
339        state.bytes_avail += bytes as i64;
340        for _ in 0..num_reqs {
341            state.priorities_in_flight.remove(priority);
342        }
343        drop(state);
344
345        self.notify.notify_one();
346    }
347
348    fn close(&self) {
349        let mut state = self.state.lock().unwrap();
350        state.done_scheduling = true;
351        let pending_requests = std::mem::take(&mut state.pending_requests);
352        drop(state);
353        for request in pending_requests {
354            request.cancel();
355        }
356
357        self.notify.notify_one();
358    }
359}
360
361// There is one instance of MutableBatch shared by all the I/O operations
362// that make up a single request.  When all the I/O operations complete
363// then the MutableBatch goes out of scope and the batch request is considered
364// complete
365struct MutableBatch<F: FnOnce(Response) + Send> {
366    when_done: Option<F>,
367    data_buffers: Vec<Bytes>,
368    num_bytes: u64,
369    priority: u128,
370    num_reqs: usize,
371    err: Option<Box<dyn std::error::Error + Send + Sync + 'static>>,
372}
373
374impl<F: FnOnce(Response) + Send> MutableBatch<F> {
375    fn new(when_done: F, num_data_buffers: u32, priority: u128, num_reqs: usize) -> Self {
376        Self {
377            when_done: Some(when_done),
378            data_buffers: vec![Bytes::default(); num_data_buffers as usize],
379            num_bytes: 0,
380            priority,
381            num_reqs,
382            err: None,
383        }
384    }
385}
386
387// Rather than keep track of when all the I/O requests are finished so that we
388// can deliver the batch of data we let Rust do that for us.  When all I/O's are
389// done then the MutableBatch will go out of scope and we know we have all the
390// data.
391impl<F: FnOnce(Response) + Send> Drop for MutableBatch<F> {
392    fn drop(&mut self) {
393        // If we have an error, return that.  Otherwise return the data
394        let result = if self.err.is_some() {
395            Err(Error::wrapped(self.err.take().unwrap()))
396        } else {
397            let mut data = Vec::new();
398            std::mem::swap(&mut data, &mut self.data_buffers);
399            Ok(data)
400        };
401        // We don't really care if no one is around to receive it, just let
402        // the result go out of scope and get cleaned up
403        let response = Response {
404            data: result,
405            num_bytes: self.num_bytes,
406            priority: self.priority,
407            num_reqs: self.num_reqs,
408        };
409        (self.when_done.take().unwrap())(response);
410    }
411}
412
413struct DataChunk {
414    task_idx: usize,
415    num_bytes: u64,
416    data: Result<Bytes>,
417}
418
419trait DataSink: Send {
420    fn deliver_data(&mut self, data: DataChunk);
421}
422
423impl<F: FnOnce(Response) + Send> DataSink for MutableBatch<F> {
424    // Called by worker tasks to add data to the MutableBatch
425    fn deliver_data(&mut self, data: DataChunk) {
426        self.num_bytes += data.num_bytes;
427        match data.data {
428            Ok(data_bytes) => {
429                self.data_buffers[data.task_idx] = data_bytes;
430            }
431            Err(err) => {
432                // This keeps the original error, if present
433                self.err.get_or_insert(Box::new(err));
434            }
435        }
436    }
437}
438
439struct IoTask {
440    reader: Arc<dyn Reader>,
441    to_read: Range<u64>,
442    when_done: Box<dyn FnOnce(Result<Bytes>) + Send>,
443    priority: u128,
444}
445
446impl Eq for IoTask {}
447
448impl PartialEq for IoTask {
449    fn eq(&self, other: &Self) -> bool {
450        self.priority == other.priority
451    }
452}
453
454impl PartialOrd for IoTask {
455    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
456        Some(self.cmp(other))
457    }
458}
459
460impl Ord for IoTask {
461    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
462        // This is intentionally inverted.  We want a min-heap
463        other.priority.cmp(&self.priority)
464    }
465}
466
467impl IoTask {
468    fn num_bytes(&self) -> u64 {
469        self.to_read.end - self.to_read.start
470    }
471    fn cancel(self) {
472        (self.when_done)(Err(Error::internal(
473            "Scheduler closed before I/O was completed".to_string(),
474        )));
475    }
476
477    async fn run(self) {
478        let file_path = self.reader.path().as_ref();
479        let num_bytes = self.num_bytes();
480        let bytes = if self.to_read.start == self.to_read.end {
481            Ok(Bytes::new())
482        } else {
483            let bytes_fut = self
484                .reader
485                .get_range(self.to_read.start as usize..self.to_read.end as usize);
486            IOPS_COUNTER.fetch_add(1, Ordering::Release);
487            let num_bytes = self.num_bytes();
488            bytes_fut
489                .inspect(move |_| {
490                    BYTES_READ_COUNTER.fetch_add(num_bytes, Ordering::Release);
491                })
492                .await
493                .map_err(Error::from)
494        };
495        // Emit per-file I/O trace event only when tracing is enabled
496        tracing::trace!(
497            file = file_path,
498            bytes_read = num_bytes,
499            requests = 1,
500            range_start = self.to_read.start,
501            range_end = self.to_read.end,
502            "File I/O completed"
503        );
504        IOPS_QUOTA.release();
505        (self.when_done)(bytes);
506    }
507}
508
509// Every time a scheduler starts up it launches a task to run the I/O loop.  This loop
510// repeats endlessly until the scheduler is destroyed.
511async fn run_io_loop(tasks: Arc<IoQueue>) {
512    // Pop the first finished task off the queue and submit another until
513    // we are done
514    loop {
515        let next_task = tasks.pop().await;
516        match next_task {
517            Some(task) => {
518                tokio::spawn(task.run());
519            }
520            None => {
521                // The sender has been dropped, we are done
522                return;
523            }
524        }
525    }
526}
527
528#[derive(Debug)]
529struct StatsCollector {
530    iops: AtomicU64,
531    requests: AtomicU64,
532    bytes_read: AtomicU64,
533}
534
535impl StatsCollector {
536    fn new() -> Self {
537        Self {
538            iops: AtomicU64::new(0),
539            requests: AtomicU64::new(0),
540            bytes_read: AtomicU64::new(0),
541        }
542    }
543
544    fn iops(&self) -> u64 {
545        self.iops.load(Ordering::Relaxed)
546    }
547
548    fn bytes_read(&self) -> u64 {
549        self.bytes_read.load(Ordering::Relaxed)
550    }
551
552    fn requests(&self) -> u64 {
553        self.requests.load(Ordering::Relaxed)
554    }
555
556    fn record_request(&self, request: &[Range<u64>]) {
557        self.requests.fetch_add(1, Ordering::Relaxed);
558        self.iops.fetch_add(request.len() as u64, Ordering::Relaxed);
559        self.bytes_read.fetch_add(
560            request.iter().map(|r| r.end - r.start).sum::<u64>(),
561            Ordering::Relaxed,
562        );
563    }
564}
565
566pub struct ScanStats {
567    pub iops: u64,
568    pub requests: u64,
569    pub bytes_read: u64,
570}
571
572impl ScanStats {
573    fn new(stats: &StatsCollector) -> Self {
574        Self {
575            iops: stats.iops(),
576            requests: stats.requests(),
577            bytes_read: stats.bytes_read(),
578        }
579    }
580}
581
582enum IoQueueType {
583    Standard(Arc<IoQueue>),
584    Lite(Arc<lite::IoQueue>),
585}
586
587/// An I/O scheduler which wraps an ObjectStore and throttles the amount of
588/// parallel I/O that can be run.
589///
590/// The ScanScheduler will cancel any outstanding I/O requests when it is dropped.
591/// For this reason it should be kept alive until all I/O has finished.
592///
593/// Note: The 2.X file readers already do this so this is only a concern if you are
594/// using the ScanScheduler directly.
595pub struct ScanScheduler {
596    object_store: Arc<ObjectStore>,
597    io_queue: IoQueueType,
598    stats: Arc<StatsCollector>,
599}
600
601impl Debug for ScanScheduler {
602    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
603        f.debug_struct("ScanScheduler")
604            .field("object_store", &self.object_store)
605            .finish()
606    }
607}
608
609struct Response {
610    data: Result<Vec<Bytes>>,
611    priority: u128,
612    num_reqs: usize,
613    num_bytes: u64,
614}
615
616#[derive(Debug, Clone, Copy)]
617pub struct SchedulerConfig {
618    /// the # of bytes that can be buffered but not yet requested.
619    /// This controls back pressure.  If data is not processed quickly enough then this
620    /// buffer will fill up and the I/O loop will pause until the buffer is drained.
621    pub io_buffer_size_bytes: u64,
622    /// Whether to use the new lite scheduler
623    pub use_lite_scheduler: bool,
624}
625
626impl SchedulerConfig {
627    pub fn new(io_buffer_size_bytes: u64) -> Self {
628        Self {
629            io_buffer_size_bytes,
630            use_lite_scheduler: std::env::var("LANCE_USE_LITE_SCHEDULER").is_ok(),
631        }
632    }
633
634    /// Big enough for unit testing
635    pub fn default_for_testing() -> Self {
636        Self {
637            io_buffer_size_bytes: 256 * 1024 * 1024,
638            use_lite_scheduler: false,
639        }
640    }
641
642    /// Configuration that should generally maximize bandwidth (not trying to save RAM
643    /// at all).  We assume a max page size of 32MiB and then allow 32MiB per I/O thread
644    pub fn max_bandwidth(store: &ObjectStore) -> Self {
645        Self::new(32 * 1024 * 1024 * store.io_parallelism() as u64)
646    }
647
648    pub fn with_lite_scheduler(self) -> Self {
649        Self {
650            use_lite_scheduler: true,
651            ..self
652        }
653    }
654}
655
656impl ScanScheduler {
657    /// Create a new scheduler with the given I/O capacity
658    ///
659    /// # Arguments
660    ///
661    /// * object_store - the store to wrap
662    /// * config - configuration settings for the scheduler
663    pub fn new(object_store: Arc<ObjectStore>, config: SchedulerConfig) -> Arc<Self> {
664        let io_capacity = object_store.io_parallelism();
665        let io_queue = if config.use_lite_scheduler {
666            let io_queue = Arc::new(lite::IoQueue::new(
667                io_capacity as u64,
668                config.io_buffer_size_bytes,
669            ));
670            IoQueueType::Lite(io_queue)
671        } else {
672            let io_queue = Arc::new(IoQueue::new(
673                io_capacity as u32,
674                config.io_buffer_size_bytes,
675            ));
676            let io_queue_clone = io_queue.clone();
677            // Best we can do here is fire and forget.  If the I/O loop is still running when the scheduler is
678            // dropped we can't wait for it to finish or we'd block a tokio thread.  We could spawn a blocking task
679            // to wait for it to finish but that doesn't seem helpful.
680            tokio::task::spawn(async move { run_io_loop(io_queue_clone).await });
681            IoQueueType::Standard(io_queue)
682        };
683        Arc::new(Self {
684            object_store,
685            io_queue,
686            stats: Arc::new(StatsCollector::new()),
687        })
688    }
689
690    /// Open a file for reading
691    ///
692    /// # Arguments
693    ///
694    /// * path - the path to the file to open
695    /// * base_priority - the base priority for I/O requests submitted to this file scheduler
696    ///   this will determine the upper 64 bits of priority (the lower 64 bits
697    ///   come from `submit_request` and `submit_single`)
698    pub async fn open_file_with_priority(
699        self: &Arc<Self>,
700        path: &Path,
701        base_priority: u64,
702        file_size_bytes: &CachedFileSize,
703    ) -> Result<FileScheduler> {
704        let file_size_bytes = if let Some(size) = file_size_bytes.get() {
705            u64::from(size)
706        } else {
707            let size = self.object_store.size(path).await?;
708            if let Some(size) = NonZero::new(size) {
709                file_size_bytes.set(size);
710            }
711            size
712        };
713        let reader = self
714            .object_store
715            .open_with_size(path, file_size_bytes as usize)
716            .await?;
717        let block_size = self.object_store.block_size() as u64;
718        let max_iop_size = self.object_store.max_iop_size();
719        Ok(FileScheduler {
720            reader: reader.into(),
721            block_size,
722            root: self.clone(),
723            base_priority,
724            max_iop_size,
725        })
726    }
727
728    /// Open a file with a default priority of 0
729    ///
730    /// See [`Self::open_file_with_priority`] for more information on the priority
731    pub async fn open_file(
732        self: &Arc<Self>,
733        path: &Path,
734        file_size_bytes: &CachedFileSize,
735    ) -> Result<FileScheduler> {
736        self.open_file_with_priority(path, 0, file_size_bytes).await
737    }
738
739    fn do_submit_request(
740        &self,
741        reader: Arc<dyn Reader>,
742        request: Vec<Range<u64>>,
743        tx: oneshot::Sender<Response>,
744        priority: u128,
745        io_queue: &Arc<IoQueue>,
746    ) {
747        let num_iops = request.len() as u32;
748
749        let when_all_io_done = move |bytes_and_permits| {
750            // We don't care if the receiver has given up so discard the result
751            let _ = tx.send(bytes_and_permits);
752        };
753
754        let dest = Arc::new(Mutex::new(Box::new(MutableBatch::new(
755            when_all_io_done,
756            num_iops,
757            priority,
758            request.len(),
759        ))));
760
761        for (task_idx, iop) in request.into_iter().enumerate() {
762            let dest = dest.clone();
763            let io_queue_clone = io_queue.clone();
764            let num_bytes = iop.end - iop.start;
765            let task = IoTask {
766                reader: reader.clone(),
767                to_read: iop,
768                priority,
769                when_done: Box::new(move |data| {
770                    io_queue_clone.on_iop_complete();
771                    let mut dest = dest.lock().unwrap();
772                    let chunk = DataChunk {
773                        data,
774                        task_idx,
775                        num_bytes,
776                    };
777                    dest.deliver_data(chunk);
778                }),
779            };
780            io_queue.push(task);
781        }
782    }
783
784    fn submit_request_standard(
785        &self,
786        reader: Arc<dyn Reader>,
787        request: Vec<Range<u64>>,
788        priority: u128,
789        io_queue: &Arc<IoQueue>,
790    ) -> impl Future<Output = Result<Vec<Bytes>>> + Send + use<> {
791        let (tx, rx) = oneshot::channel::<Response>();
792
793        self.do_submit_request(reader, request, tx, priority, io_queue);
794
795        let io_queue_clone = io_queue.clone();
796
797        rx.map(move |wrapped_rsp| {
798            // Right now, it isn't possible for I/O to be cancelled so a cancel error should
799            // not occur
800            let rsp = wrapped_rsp.unwrap();
801            io_queue_clone.on_bytes_consumed(rsp.num_bytes, rsp.priority, rsp.num_reqs);
802            rsp.data
803        })
804    }
805
806    fn submit_request_lite(
807        &self,
808        reader: Arc<dyn Reader>,
809        request: Vec<Range<u64>>,
810        priority: u128,
811        io_queue: &Arc<lite::IoQueue>,
812    ) -> impl Future<Output = Result<Vec<Bytes>>> + Send + use<> {
813        // It's important that we submit all requests _before_ we await anything
814        let maybe_tasks = request
815            .into_iter()
816            .map(|task| {
817                let reader = reader.clone();
818                let queue = io_queue.clone();
819                let run_fn = Box::new(move || {
820                    reader
821                        .get_range(task.start as usize..task.end as usize)
822                        .map_err(Error::from)
823                        .boxed()
824                });
825                queue.submit(task, priority, run_fn)
826            })
827            .collect::<Result<Vec<_>>>();
828        match maybe_tasks {
829            Ok(tasks) => async move {
830                let mut results = Vec::with_capacity(tasks.len());
831                for task in tasks {
832                    results.push(task.await?);
833                }
834                Ok(results)
835            }
836            .boxed(),
837            Err(e) => async move { Err(e) }.boxed(),
838        }
839    }
840
841    pub fn submit_request(
842        &self,
843        reader: Arc<dyn Reader>,
844        request: Vec<Range<u64>>,
845        priority: u128,
846    ) -> impl Future<Output = Result<Vec<Bytes>>> + Send + use<> {
847        match &self.io_queue {
848            IoQueueType::Standard(io_queue) => futures::future::Either::Left(
849                self.submit_request_standard(reader, request, priority, io_queue),
850            ),
851            IoQueueType::Lite(io_queue) => futures::future::Either::Right(
852                self.submit_request_lite(reader, request, priority, io_queue),
853            ),
854        }
855    }
856
857    pub fn stats(&self) -> ScanStats {
858        ScanStats::new(self.stats.as_ref())
859    }
860}
861
862impl Drop for ScanScheduler {
863    fn drop(&mut self) {
864        // If the user is dropping the ScanScheduler then they _should_ be done with I/O.  This can happen
865        // even when I/O is in progress if, for example, the user is dropping a scan mid-read because they found
866        // the data they wanted (limit after filter or some other example).
867        //
868        // Closing the I/O queue will cancel any requests that have not yet been sent to the I/O loop.  However,
869        // it will not terminate the I/O loop itself.  This is to help prevent deadlock and ensure that all I/O
870        // requests that are submitted will terminate.
871        //
872        // In theory, this isn't strictly necessary, as callers should drop any task expecting I/O before they
873        // drop the scheduler.  In practice, this can be difficult to do, and it is better to spend a little bit
874        // of time letting the I/O loop drain so that we can avoid any potential deadlocks.
875        match &self.io_queue {
876            IoQueueType::Standard(io_queue) => io_queue.close(),
877            IoQueueType::Lite(io_queue) => io_queue.close(),
878        }
879    }
880}
881
882/// A throttled file reader
883#[derive(Clone, Debug)]
884pub struct FileScheduler {
885    reader: Arc<dyn Reader>,
886    root: Arc<ScanScheduler>,
887    block_size: u64,
888    base_priority: u64,
889    max_iop_size: u64,
890}
891
892fn is_close_together(range1: &Range<u64>, range2: &Range<u64>, block_size: u64) -> bool {
893    // Note that range1.end <= range2.start is possible (e.g. when decoding string arrays)
894    range2.start <= (range1.end + block_size)
895}
896
897fn is_overlapping(range1: &Range<u64>, range2: &Range<u64>) -> bool {
898    range1.start < range2.end && range2.start < range1.end
899}
900
901impl FileScheduler {
902    /// Submit a batch of I/O requests to the reader
903    ///
904    /// The requests will be queued in a FIFO manner and, when all requests
905    /// have been fulfilled, the returned future will be completed.
906    ///
907    /// Each request has a given priority.  If the I/O loop is full then requests
908    /// will be buffered and requests with the *lowest* priority will be released
909    /// from the buffer first.
910    ///
911    /// Each request has a backpressure ID which controls which backpressure throttle
912    /// is applied to the request.  Requests made to the same backpressure throttle
913    /// will be throttled together.
914    pub fn submit_request(
915        &self,
916        request: Vec<Range<u64>>,
917        priority: u64,
918    ) -> impl Future<Output = Result<Vec<Bytes>>> + Send + use<> {
919        // The final priority is a combination of the row offset and the file number
920        let priority = ((self.base_priority as u128) << 64) + priority as u128;
921
922        let mut merged_requests = Vec::with_capacity(request.len());
923
924        if !request.is_empty() {
925            let mut curr_interval = request[0].clone();
926
927            for req in request.iter().skip(1) {
928                if is_close_together(&curr_interval, req, self.block_size) {
929                    curr_interval.end = curr_interval.end.max(req.end);
930                } else {
931                    merged_requests.push(curr_interval);
932                    curr_interval = req.clone();
933                }
934            }
935
936            merged_requests.push(curr_interval);
937        }
938
939        let mut updated_requests = Vec::with_capacity(merged_requests.len());
940        for req in merged_requests {
941            if req.is_empty() {
942                updated_requests.push(req);
943            } else {
944                let num_requests = (req.end - req.start).div_ceil(self.max_iop_size);
945                let bytes_per_request = (req.end - req.start) / num_requests;
946                for i in 0..num_requests {
947                    let start = req.start + i * bytes_per_request;
948                    let end = if i == num_requests - 1 {
949                        // Last request is a bit bigger due to rounding
950                        req.end
951                    } else {
952                        start + bytes_per_request
953                    };
954                    updated_requests.push(start..end);
955                }
956            }
957        }
958
959        self.root.stats.record_request(&updated_requests);
960
961        let bytes_vec_fut =
962            self.root
963                .submit_request(self.reader.clone(), updated_requests.clone(), priority);
964
965        let mut updated_index = 0;
966        let mut final_bytes = Vec::with_capacity(request.len());
967
968        async move {
969            let bytes_vec = bytes_vec_fut.await?;
970
971            let mut orig_index = 0;
972            while (updated_index < updated_requests.len()) && (orig_index < request.len()) {
973                let updated_range = &updated_requests[updated_index];
974                let orig_range = &request[orig_index];
975                let byte_offset = updated_range.start as usize;
976
977                if is_overlapping(updated_range, orig_range) {
978                    // We need to undo the coalescing and splitting done earlier
979                    let start = orig_range.start as usize - byte_offset;
980                    if orig_range.end <= updated_range.end {
981                        // The original range is fully contained in the updated range, can do
982                        // zero-copy slice
983                        let end = orig_range.end as usize - byte_offset;
984                        final_bytes.push(bytes_vec[updated_index].slice(start..end));
985                    } else {
986                        // The original read was split into multiple requests, need to copy
987                        // back into a single buffer
988                        let orig_size = orig_range.end - orig_range.start;
989                        let mut merged_bytes = Vec::with_capacity(orig_size as usize);
990                        merged_bytes.extend_from_slice(&bytes_vec[updated_index].slice(start..));
991                        let mut copy_offset = merged_bytes.len() as u64;
992                        while copy_offset < orig_size {
993                            updated_index += 1;
994                            let next_range = &updated_requests[updated_index];
995                            let bytes_to_take =
996                                (orig_size - copy_offset).min(next_range.end - next_range.start);
997                            merged_bytes.extend_from_slice(
998                                &bytes_vec[updated_index].slice(0..bytes_to_take as usize),
999                            );
1000                            copy_offset += bytes_to_take;
1001                        }
1002                        final_bytes.push(Bytes::from(merged_bytes));
1003                    }
1004                    orig_index += 1;
1005                } else {
1006                    updated_index += 1;
1007                }
1008            }
1009
1010            Ok(final_bytes)
1011        }
1012    }
1013
1014    pub fn with_priority(&self, priority: u64) -> Self {
1015        Self {
1016            reader: self.reader.clone(),
1017            root: self.root.clone(),
1018            block_size: self.block_size,
1019            max_iop_size: self.max_iop_size,
1020            base_priority: priority,
1021        }
1022    }
1023
1024    /// Submit a single IOP to the reader
1025    ///
1026    /// If you have multiple IOPS to perform then [`Self::submit_request`] is going
1027    /// to be more efficient.
1028    ///
1029    /// See [`Self::submit_request`] for more information on the priority and backpressure.
1030    pub fn submit_single(
1031        &self,
1032        range: Range<u64>,
1033        priority: u64,
1034    ) -> impl Future<Output = Result<Bytes>> + Send {
1035        self.submit_request(vec![range], priority)
1036            .map_ok(|vec_bytes| vec_bytes.into_iter().next().unwrap())
1037    }
1038
1039    /// Provides access to the underlying reader
1040    ///
1041    /// Do not use this for reading data as it will bypass any I/O scheduling!
1042    /// This is mainly exposed to allow metadata operations (e.g size, block_size,)
1043    /// which either aren't IOPS or we don't throttle
1044    pub fn reader(&self) -> &Arc<dyn Reader> {
1045        &self.reader
1046    }
1047}
1048
1049#[cfg(test)]
1050mod tests {
1051    use std::{collections::VecDeque, time::Duration};
1052
1053    use futures::poll;
1054    use lance_core::utils::tempfile::TempObjFile;
1055    use rand::RngCore;
1056
1057    use object_store::{GetRange, ObjectStore as OSObjectStore, memory::InMemory};
1058    use tokio::{runtime::Handle, time::timeout};
1059    use url::Url;
1060
1061    use crate::{
1062        object_store::{DEFAULT_DOWNLOAD_RETRY_COUNT, DEFAULT_MAX_IOP_SIZE},
1063        testing::MockObjectStore,
1064    };
1065
1066    use super::*;
1067
1068    #[tokio::test]
1069    async fn test_full_seq_read() {
1070        let tmp_file = TempObjFile::default();
1071
1072        let obj_store = Arc::new(ObjectStore::local());
1073
1074        // Write 1MiB of data
1075        const DATA_SIZE: u64 = 1024 * 1024;
1076        let mut some_data = vec![0; DATA_SIZE as usize];
1077        rand::rng().fill_bytes(&mut some_data);
1078        obj_store.put(&tmp_file, &some_data).await.unwrap();
1079
1080        let config = SchedulerConfig::default_for_testing();
1081
1082        let scheduler = ScanScheduler::new(obj_store, config);
1083
1084        let file_scheduler = scheduler
1085            .open_file(&tmp_file, &CachedFileSize::unknown())
1086            .await
1087            .unwrap();
1088
1089        // Read it back 4KiB at a time
1090        const READ_SIZE: u64 = 4 * 1024;
1091        let mut reqs = VecDeque::new();
1092        let mut offset = 0;
1093        while offset < DATA_SIZE {
1094            reqs.push_back(
1095                #[allow(clippy::single_range_in_vec_init)]
1096                file_scheduler
1097                    .submit_request(vec![offset..offset + READ_SIZE], 0)
1098                    .await
1099                    .unwrap(),
1100            );
1101            offset += READ_SIZE;
1102        }
1103
1104        offset = 0;
1105        // Note: we should get parallel I/O even though we are consuming serially
1106        while offset < DATA_SIZE {
1107            let data = reqs.pop_front().unwrap();
1108            let actual = &data[0];
1109            let expected = &some_data[offset as usize..(offset + READ_SIZE) as usize];
1110            assert_eq!(expected, actual);
1111            offset += READ_SIZE;
1112        }
1113    }
1114
1115    #[tokio::test]
1116    async fn test_split_coalesce() {
1117        let tmp_file = TempObjFile::default();
1118
1119        let obj_store = Arc::new(ObjectStore::local());
1120
1121        // Write 75MiB of data
1122        const DATA_SIZE: u64 = 75 * 1024 * 1024;
1123        let mut some_data = vec![0; DATA_SIZE as usize];
1124        rand::rng().fill_bytes(&mut some_data);
1125        obj_store.put(&tmp_file, &some_data).await.unwrap();
1126
1127        let config = SchedulerConfig::default_for_testing();
1128
1129        let scheduler = ScanScheduler::new(obj_store, config);
1130
1131        let file_scheduler = scheduler
1132            .open_file(&tmp_file, &CachedFileSize::unknown())
1133            .await
1134            .unwrap();
1135
1136        // These 3 requests should be coalesced into a single I/O because they are within 4KiB
1137        // of each other
1138        let req =
1139            file_scheduler.submit_request(vec![50_000..51_000, 52_000..53_000, 54_000..55_000], 0);
1140
1141        let bytes = req.await.unwrap();
1142
1143        assert_eq!(bytes[0], &some_data[50_000..51_000]);
1144        assert_eq!(bytes[1], &some_data[52_000..53_000]);
1145        assert_eq!(bytes[2], &some_data[54_000..55_000]);
1146
1147        assert_eq!(1, scheduler.stats().iops);
1148
1149        // This should be split into 5 requests because it is so large
1150        let req = file_scheduler.submit_request(vec![0..DATA_SIZE], 0);
1151        let bytes = req.await.unwrap();
1152        assert!(bytes[0] == some_data, "data is not the same");
1153
1154        assert_eq!(6, scheduler.stats().iops);
1155
1156        // None of these requests are bigger than the max IOP size but they will be coalesced into
1157        // one IOP that is bigger and then split back into 2 requests that don't quite align with the original
1158        // ranges.
1159        let chunk_size = *DEFAULT_MAX_IOP_SIZE;
1160        let req = file_scheduler.submit_request(
1161            vec![
1162                10..chunk_size,
1163                chunk_size + 10..(chunk_size * 2) - 20,
1164                chunk_size * 2..(chunk_size * 2) + 10,
1165            ],
1166            0,
1167        );
1168
1169        let bytes = req.await.unwrap();
1170        let chunk_size = chunk_size as usize;
1171        assert!(
1172            bytes[0] == some_data[10..chunk_size],
1173            "data is not the same"
1174        );
1175        assert!(
1176            bytes[1] == some_data[chunk_size + 10..(chunk_size * 2) - 20],
1177            "data is not the same"
1178        );
1179        assert!(
1180            bytes[2] == some_data[chunk_size * 2..(chunk_size * 2) + 10],
1181            "data is not the same"
1182        );
1183        assert_eq!(8, scheduler.stats().iops);
1184
1185        let reads = (0..44)
1186            .map(|i| i * 1_000_000..(i + 1) * 1_000_000)
1187            .collect::<Vec<_>>();
1188        let req = file_scheduler.submit_request(reads, 0);
1189        let bytes = req.await.unwrap();
1190        for (i, bytes) in bytes.iter().enumerate() {
1191            assert!(
1192                bytes == &some_data[i * 1_000_000..(i + 1) * 1_000_000],
1193                "data is not the same"
1194            );
1195        }
1196        assert_eq!(11, scheduler.stats().iops);
1197    }
1198
1199    #[tokio::test]
1200    async fn test_priority() {
1201        let some_path = Path::parse("foo").unwrap();
1202        let base_store = Arc::new(InMemory::new());
1203        base_store
1204            .put(&some_path, vec![0; 1000].into())
1205            .await
1206            .unwrap();
1207
1208        let semaphore = Arc::new(tokio::sync::Semaphore::new(0));
1209        let mut obj_store = MockObjectStore::default();
1210        let semaphore_copy = semaphore.clone();
1211        obj_store
1212            .expect_get_opts()
1213            .returning(move |location, options| {
1214                let semaphore = semaphore.clone();
1215                let base_store = base_store.clone();
1216                let location = location.clone();
1217                async move {
1218                    semaphore.acquire().await.unwrap().forget();
1219                    base_store.get_opts(&location, options).await
1220                }
1221                .boxed()
1222            });
1223        let obj_store = Arc::new(ObjectStore::new(
1224            Arc::new(obj_store),
1225            Url::parse("mem://").unwrap(),
1226            Some(500),
1227            None,
1228            false,
1229            false,
1230            1,
1231            DEFAULT_DOWNLOAD_RETRY_COUNT,
1232            None,
1233        ));
1234
1235        let config = SchedulerConfig {
1236            io_buffer_size_bytes: 1024 * 1024,
1237            use_lite_scheduler: false,
1238        };
1239
1240        let scan_scheduler = ScanScheduler::new(obj_store, config);
1241
1242        let file_scheduler = scan_scheduler
1243            .open_file(&Path::parse("foo").unwrap(), &CachedFileSize::new(1000))
1244            .await
1245            .unwrap();
1246
1247        // Issue a request, priority doesn't matter, it will be submitted
1248        // immediately (it will go pending)
1249        // Note: the timeout is to prevent a deadlock if the test fails.
1250        let first_fut = timeout(
1251            Duration::from_secs(10),
1252            file_scheduler.submit_single(0..10, 0),
1253        )
1254        .boxed();
1255
1256        // Issue another low priority request (it will go in queue)
1257        let mut second_fut = timeout(
1258            Duration::from_secs(10),
1259            file_scheduler.submit_single(0..20, 100),
1260        )
1261        .boxed();
1262
1263        // Issue a high priority request (it will go in queue and should bump
1264        // the other queued request down)
1265        let mut third_fut = timeout(
1266            Duration::from_secs(10),
1267            file_scheduler.submit_single(0..30, 0),
1268        )
1269        .boxed();
1270
1271        // Finish one file, should be the in-flight first request
1272        semaphore_copy.add_permits(1);
1273        assert!(first_fut.await.unwrap().unwrap().len() == 10);
1274        // Other requests should not be finished
1275        assert!(poll!(&mut second_fut).is_pending());
1276        assert!(poll!(&mut third_fut).is_pending());
1277
1278        // Next should be high priority request
1279        semaphore_copy.add_permits(1);
1280        assert!(third_fut.await.unwrap().unwrap().len() == 30);
1281        assert!(poll!(&mut second_fut).is_pending());
1282
1283        // Finally, the low priority request
1284        semaphore_copy.add_permits(1);
1285        assert!(second_fut.await.unwrap().unwrap().len() == 20);
1286    }
1287
1288    #[tokio::test(flavor = "multi_thread")]
1289    async fn test_backpressure() {
1290        let some_path = Path::parse("foo").unwrap();
1291        let base_store = Arc::new(InMemory::new());
1292        base_store
1293            .put(&some_path, vec![0; 100000].into())
1294            .await
1295            .unwrap();
1296
1297        let bytes_read = Arc::new(AtomicU64::from(0));
1298        let mut obj_store = MockObjectStore::default();
1299        let bytes_read_copy = bytes_read.clone();
1300        // Wraps the obj_store to keep track of how many bytes have been read
1301        obj_store
1302            .expect_get_opts()
1303            .returning(move |location, options| {
1304                let range = options.range.as_ref().unwrap();
1305                let num_bytes = match range {
1306                    GetRange::Bounded(bounded) => bounded.end - bounded.start,
1307                    _ => panic!(),
1308                };
1309                bytes_read_copy.fetch_add(num_bytes, Ordering::Release);
1310                let location = location.clone();
1311                let base_store = base_store.clone();
1312                async move { base_store.get_opts(&location, options).await }.boxed()
1313            });
1314        let obj_store = Arc::new(ObjectStore::new(
1315            Arc::new(obj_store),
1316            Url::parse("mem://").unwrap(),
1317            Some(500),
1318            None,
1319            false,
1320            false,
1321            1,
1322            DEFAULT_DOWNLOAD_RETRY_COUNT,
1323            None,
1324        ));
1325
1326        let config = SchedulerConfig {
1327            io_buffer_size_bytes: 10,
1328            use_lite_scheduler: false,
1329        };
1330
1331        let scan_scheduler = ScanScheduler::new(obj_store.clone(), config);
1332
1333        let file_scheduler = scan_scheduler
1334            .open_file(&Path::parse("foo").unwrap(), &CachedFileSize::new(100000))
1335            .await
1336            .unwrap();
1337
1338        let wait_for_idle = || async move {
1339            let handle = Handle::current();
1340            while handle.metrics().num_alive_tasks() != 1 {
1341                tokio::time::sleep(Duration::from_millis(10)).await;
1342            }
1343        };
1344        let wait_for_bytes_read_and_idle = |target_bytes: u64| {
1345            // We need to move `target` but don't want to move `bytes_read`
1346            let bytes_read = &bytes_read;
1347            async move {
1348                let bytes_read_copy = bytes_read.clone();
1349                while bytes_read_copy.load(Ordering::Acquire) < target_bytes {
1350                    tokio::time::sleep(Duration::from_millis(10)).await;
1351                }
1352                wait_for_idle().await;
1353            }
1354        };
1355
1356        // This read will begin immediately
1357        let first_fut = file_scheduler.submit_single(0..5, 0);
1358        // This read should also begin immediately
1359        let second_fut = file_scheduler.submit_single(0..5, 0);
1360        // This read will be throttled
1361        let third_fut = file_scheduler.submit_single(0..3, 0);
1362        // Two tasks (third_fut and unit test)
1363        wait_for_bytes_read_and_idle(10).await;
1364
1365        assert_eq!(first_fut.await.unwrap().len(), 5);
1366        // One task (unit test)
1367        wait_for_bytes_read_and_idle(13).await;
1368
1369        // 2 bytes are ready but 5 bytes requested, read will be blocked
1370        let fourth_fut = file_scheduler.submit_single(0..5, 0);
1371        wait_for_bytes_read_and_idle(13).await;
1372
1373        // Out of order completion is ok, will unblock backpressure
1374        assert_eq!(third_fut.await.unwrap().len(), 3);
1375        wait_for_bytes_read_and_idle(18).await;
1376
1377        assert_eq!(second_fut.await.unwrap().len(), 5);
1378        // At this point there are 5 bytes available in backpressure queue
1379        // Now we issue multi-read that can be partially fulfilled, it will read some bytes but
1380        // not all of them. (using large range gap to ensure request not coalesced)
1381        //
1382        // I'm actually not sure this behavior is great.  It's possible that we should just
1383        // block until we can fulfill the entire request.
1384        let fifth_fut = file_scheduler.submit_request(vec![0..3, 90000..90007], 0);
1385        wait_for_bytes_read_and_idle(21).await;
1386
1387        // Fifth future should eventually finish due to deadlock prevention
1388        let fifth_bytes = tokio::time::timeout(Duration::from_secs(10), fifth_fut)
1389            .await
1390            .unwrap();
1391        assert_eq!(
1392            fifth_bytes.unwrap().iter().map(|b| b.len()).sum::<usize>(),
1393            10
1394        );
1395
1396        // And now let's just make sure that we can read the rest of the data
1397        assert_eq!(fourth_fut.await.unwrap().len(), 5);
1398        wait_for_bytes_read_and_idle(28).await;
1399
1400        // Ensure deadlock prevention timeout can be disabled
1401        let config = SchedulerConfig {
1402            io_buffer_size_bytes: 10,
1403            use_lite_scheduler: false,
1404        };
1405
1406        let scan_scheduler = ScanScheduler::new(obj_store, config);
1407        let file_scheduler = scan_scheduler
1408            .open_file(&Path::parse("foo").unwrap(), &CachedFileSize::new(100000))
1409            .await
1410            .unwrap();
1411
1412        let first_fut = file_scheduler.submit_single(0..10, 0);
1413        let second_fut = file_scheduler.submit_single(0..10, 0);
1414
1415        std::thread::sleep(Duration::from_millis(100));
1416        assert_eq!(first_fut.await.unwrap().len(), 10);
1417        assert_eq!(second_fut.await.unwrap().len(), 10);
1418    }
1419
1420    /// A Reader that tracks how many times get_range has been called.
1421    #[derive(Debug)]
1422    struct TrackingReader {
1423        get_range_count: Arc<AtomicU64>,
1424        path: Path,
1425    }
1426
1427    impl deepsize::DeepSizeOf for TrackingReader {
1428        fn deep_size_of_children(&self, _context: &mut deepsize::Context) -> usize {
1429            0
1430        }
1431    }
1432
1433    impl Reader for TrackingReader {
1434        fn path(&self) -> &Path {
1435            &self.path
1436        }
1437
1438        fn block_size(&self) -> usize {
1439            4096
1440        }
1441
1442        fn io_parallelism(&self) -> usize {
1443            1
1444        }
1445
1446        fn size(&self) -> futures::future::BoxFuture<'_, object_store::Result<usize>> {
1447            Box::pin(async { Ok(1_000_000) })
1448        }
1449
1450        fn get_range(
1451            &self,
1452            range: Range<usize>,
1453        ) -> futures::future::BoxFuture<'static, object_store::Result<Bytes>> {
1454            self.get_range_count.fetch_add(1, Ordering::Release);
1455            let num_bytes = range.end - range.start;
1456            Box::pin(async move { Ok(Bytes::from(vec![0u8; num_bytes])) })
1457        }
1458
1459        fn get_all(&self) -> futures::future::BoxFuture<'_, object_store::Result<Bytes>> {
1460            Box::pin(async { Ok(Bytes::from(vec![0u8; 1_000_000])) })
1461        }
1462    }
1463
1464    #[tokio::test]
1465    async fn test_lite_scheduler_submits_eagerly() {
1466        let obj_store = Arc::new(ObjectStore::memory());
1467        let config = SchedulerConfig::default_for_testing().with_lite_scheduler();
1468        let scheduler = ScanScheduler::new(obj_store, config);
1469
1470        let get_range_count = Arc::new(AtomicU64::new(0));
1471        let reader: Arc<dyn Reader> = Arc::new(TrackingReader {
1472            get_range_count: get_range_count.clone(),
1473            path: Path::parse("test").unwrap(),
1474        });
1475
1476        // Submit several requests. The lite scheduler should call get_range
1477        // eagerly during submit (before the returned future is polled).
1478        let fut1 = scheduler.submit_request(reader.clone(), vec![0..100], 0);
1479        let fut2 = scheduler.submit_request(reader.clone(), vec![100..200], 10);
1480        let fut3 = scheduler.submit_request(reader.clone(), vec![200..300], 20);
1481
1482        // get_range must have been called for all 3 requests already.
1483        assert_eq!(get_range_count.load(Ordering::Acquire), 3);
1484
1485        // The futures should still resolve with the correct data.
1486        assert_eq!(fut1.await.unwrap()[0].len(), 100);
1487        assert_eq!(fut2.await.unwrap()[0].len(), 100);
1488        assert_eq!(fut3.await.unwrap()[0].len(), 100);
1489    }
1490
1491    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1492    async fn stress_backpressure() {
1493        // This test ensures that the backpressure mechanism works correctly with
1494        // regards to priority.  In other words, as long as all requests are consumed
1495        // in priority order then the backpressure mechanism should not deadlock
1496        let some_path = Path::parse("foo").unwrap();
1497        let obj_store = Arc::new(ObjectStore::memory());
1498        obj_store
1499            .put(&some_path, vec![0; 100000].as_slice())
1500            .await
1501            .unwrap();
1502
1503        // Only one request will be allowed in
1504        let config = SchedulerConfig {
1505            io_buffer_size_bytes: 1,
1506            use_lite_scheduler: false,
1507        };
1508        let scan_scheduler = ScanScheduler::new(obj_store.clone(), config);
1509        let file_scheduler = scan_scheduler
1510            .open_file(&some_path, &CachedFileSize::unknown())
1511            .await
1512            .unwrap();
1513
1514        let mut futs = Vec::with_capacity(10000);
1515        for idx in 0..10000 {
1516            futs.push(file_scheduler.submit_single(idx..idx + 1, idx));
1517        }
1518
1519        for fut in futs {
1520            fut.await.unwrap();
1521        }
1522    }
1523}