Skip to main content

lance_io/
scheduler.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use bytes::Bytes;
5use futures::channel::oneshot;
6use futures::{FutureExt, TryFutureExt};
7use object_store::path::Path;
8use std::collections::BinaryHeap;
9use std::fmt::Debug;
10use std::future::Future;
11use std::num::NonZero;
12use std::ops::Range;
13use std::sync::atomic::{AtomicU64, Ordering};
14use std::sync::{Arc, Mutex};
15use std::time::Instant;
16use tokio::sync::Notify;
17
18use lance_core::utils::parse::str_is_truthy;
19use lance_core::{Error, Result};
20
21use crate::object_store::ObjectStore;
22use crate::traits::Reader;
23use crate::utils::CachedFileSize;
24
25mod lite;
26
27// Don't log backpressure warnings until at least this many seconds have passed
28const BACKPRESSURE_MIN: u64 = 5;
29// Don't log backpressure warnings more than once / minute
30const BACKPRESSURE_DEBOUNCE: u64 = 60;
31
32// Global counter of how many IOPS we have issued
33static IOPS_COUNTER: AtomicU64 = AtomicU64::new(0);
34// Global counter of how many bytes were read by the scheduler
35static BYTES_READ_COUNTER: AtomicU64 = AtomicU64::new(0);
36
37pub fn iops_counter() -> u64 {
38    IOPS_COUNTER.load(Ordering::Acquire)
39}
40
41pub fn bytes_read_counter() -> u64 {
42    BYTES_READ_COUNTER.load(Ordering::Acquire)
43}
44
45// We want to allow requests that have a lower priority than any
46// currently in-flight request.  This helps avoid potential deadlocks
47// related to backpressure.  Unfortunately, it is quite expensive to
48// keep track of which priorities are in-flight.
49//
50// TODO: At some point it would be nice if we can optimize this away but
51// in_flight should remain relatively small (generally less than 256 items)
52// and has not shown itself to be a bottleneck yet.
53struct PrioritiesInFlight {
54    in_flight: Vec<u128>,
55}
56
57impl PrioritiesInFlight {
58    fn new(capacity: u32) -> Self {
59        Self {
60            in_flight: Vec::with_capacity(capacity as usize * 2),
61        }
62    }
63
64    fn min_in_flight(&self) -> u128 {
65        self.in_flight.first().copied().unwrap_or(u128::MAX)
66    }
67
68    fn push(&mut self, prio: u128) {
69        let pos = match self.in_flight.binary_search(&prio) {
70            Ok(pos) => pos,
71            Err(pos) => pos,
72        };
73        self.in_flight.insert(pos, prio);
74    }
75
76    fn remove(&mut self, prio: u128) {
77        if let Ok(pos) = self.in_flight.binary_search(&prio) {
78            self.in_flight.remove(pos);
79        }
80    }
81}
82
83struct IoQueueState {
84    // Number of IOPS we can issue concurrently before pausing I/O
85    iops_avail: u32,
86    // Number of bytes we are allowed to buffer in memory before pausing I/O
87    //
88    // This can dip below 0 due to I/O prioritization
89    bytes_avail: i64,
90    // Pending I/O requests
91    pending_requests: BinaryHeap<IoTask>,
92    // Priorities of in-flight requests
93    priorities_in_flight: PrioritiesInFlight,
94    // Set when the scheduler is finished to notify the I/O loop to shut down
95    // once all outstanding requests have been completed.
96    done_scheduling: bool,
97    // Time when the scheduler started
98    start: Instant,
99    // Last time we warned about backpressure
100    last_warn: AtomicU64,
101    // When true, skip all byte-based backpressure checks (set when io_buffer_size == 0)
102    no_backpressure: bool,
103}
104
105impl IoQueueState {
106    fn new(io_capacity: u32, io_buffer_size: u64) -> Self {
107        Self {
108            iops_avail: io_capacity,
109            bytes_avail: io_buffer_size as i64,
110            pending_requests: BinaryHeap::new(),
111            priorities_in_flight: PrioritiesInFlight::new(io_capacity),
112            done_scheduling: false,
113            start: Instant::now(),
114            last_warn: AtomicU64::from(0),
115            no_backpressure: io_buffer_size == 0,
116        }
117    }
118
119    fn warn_if_needed(&self) {
120        let seconds_elapsed = self.start.elapsed().as_secs();
121        let last_warn = self.last_warn.load(Ordering::Acquire);
122        let since_last_warn = seconds_elapsed - last_warn;
123        if (last_warn == 0
124            && seconds_elapsed > BACKPRESSURE_MIN
125            && seconds_elapsed < BACKPRESSURE_DEBOUNCE)
126            || since_last_warn > BACKPRESSURE_DEBOUNCE
127        {
128            tracing::event!(tracing::Level::DEBUG, "Backpressure throttle exceeded");
129            log::debug!(
130                "Backpressure throttle is full, I/O will pause until buffer is drained.  Max I/O bandwidth will not be achieved because CPU is falling behind"
131            );
132            self.last_warn
133                .store(seconds_elapsed.max(1), Ordering::Release);
134        }
135    }
136
137    fn can_deliver(&self, task: &IoTask) -> bool {
138        if self.iops_avail == 0 {
139            false
140        } else if self.no_backpressure
141            || task.bypass_backpressure
142            || task.priority <= self.priorities_in_flight.min_in_flight()
143        {
144            true
145        } else if task.num_bytes() as i64 > self.bytes_avail {
146            self.warn_if_needed();
147            false
148        } else {
149            true
150        }
151    }
152
153    fn next_task(&mut self) -> Option<IoTask> {
154        let task = self.pending_requests.peek()?;
155        if self.can_deliver(task) {
156            let skip_bytes_accounting = self.no_backpressure || task.bypass_backpressure;
157            self.priorities_in_flight.push(task.priority);
158            self.iops_avail -= 1;
159            if !skip_bytes_accounting {
160                self.bytes_avail -= task.num_bytes() as i64;
161                if self.bytes_avail < 0 {
162                    // This can happen when we admit special priority requests
163                    log::debug!(
164                        "Backpressure throttle temporarily exceeded by {} bytes due to priority I/O",
165                        -self.bytes_avail
166                    );
167                }
168            }
169            Some(self.pending_requests.pop().unwrap())
170        } else {
171            None
172        }
173    }
174}
175
176// This is modeled after the MPSC queue described here: https://docs.rs/tokio/latest/tokio/sync/struct.Notify.html
177//
178// However, it only needs to be SPSC since there is only one "scheduler thread"
179// and one I/O loop.
180struct IoQueue {
181    // Queue state
182    state: Mutex<IoQueueState>,
183    // Used to signal new I/O requests have arrived that might potentially be runnable
184    notify: Notify,
185}
186
187impl IoQueue {
188    fn new(io_capacity: u32, io_buffer_size: u64) -> Self {
189        Self {
190            state: Mutex::new(IoQueueState::new(io_capacity, io_buffer_size)),
191            notify: Notify::new(),
192        }
193    }
194
195    fn push(&self, task: IoTask) {
196        log::trace!(
197            "Inserting I/O request for {} bytes with priority ({},{}) into I/O queue",
198            task.num_bytes(),
199            task.priority >> 64,
200            task.priority & 0xFFFFFFFFFFFFFFFF
201        );
202        let mut state = self.state.lock().unwrap();
203        state.pending_requests.push(task);
204        drop(state);
205
206        self.notify.notify_one();
207    }
208
209    async fn pop(&self) -> Option<IoTask> {
210        loop {
211            {
212                let mut state = self.state.lock().unwrap();
213                if let Some(task) = state.next_task() {
214                    return Some(task);
215                }
216
217                if state.done_scheduling {
218                    return None;
219                }
220            }
221
222            self.notify.notified().await;
223        }
224    }
225
226    fn on_iop_complete(&self) {
227        let mut state = self.state.lock().unwrap();
228        state.iops_avail += 1;
229        drop(state);
230
231        self.notify.notify_one();
232    }
233
234    fn on_bytes_consumed(&self, bytes: u64, priority: u128, num_reqs: usize) {
235        let mut state = self.state.lock().unwrap();
236        state.bytes_avail += bytes as i64;
237        for _ in 0..num_reqs {
238            state.priorities_in_flight.remove(priority);
239        }
240        drop(state);
241
242        self.notify.notify_one();
243    }
244
245    fn close(&self) {
246        let mut state = self.state.lock().unwrap();
247        state.done_scheduling = true;
248        let pending_requests = std::mem::take(&mut state.pending_requests);
249        drop(state);
250        for request in pending_requests {
251            request.cancel();
252        }
253
254        self.notify.notify_one();
255    }
256}
257
258// There is one instance of MutableBatch shared by all the I/O operations
259// that make up a single request.  When all the I/O operations complete
260// then the MutableBatch goes out of scope and the batch request is considered
261// complete
262struct MutableBatch<F: FnOnce(Response) + Send> {
263    when_done: Option<F>,
264    data_buffers: Vec<Bytes>,
265    num_bytes: u64,
266    priority: u128,
267    num_reqs: usize,
268    err: Option<Box<dyn std::error::Error + Send + Sync + 'static>>,
269    // When true, report 0 bytes consumed so the backpressure budget is unaffected
270    bypass_backpressure: bool,
271}
272
273impl<F: FnOnce(Response) + Send> MutableBatch<F> {
274    fn new(
275        when_done: F,
276        num_data_buffers: u32,
277        priority: u128,
278        num_reqs: usize,
279        bypass_backpressure: bool,
280    ) -> Self {
281        Self {
282            when_done: Some(when_done),
283            data_buffers: vec![Bytes::default(); num_data_buffers as usize],
284            num_bytes: 0,
285            priority,
286            num_reqs,
287            err: None,
288            bypass_backpressure,
289        }
290    }
291}
292
293// Rather than keep track of when all the I/O requests are finished so that we
294// can deliver the batch of data we let Rust do that for us.  When all I/O's are
295// done then the MutableBatch will go out of scope and we know we have all the
296// data.
297impl<F: FnOnce(Response) + Send> Drop for MutableBatch<F> {
298    fn drop(&mut self) {
299        // If we have an error, return that.  Otherwise return the data
300        let result = if self.err.is_some() {
301            Err(Error::wrapped(self.err.take().unwrap()))
302        } else {
303            let mut data = Vec::new();
304            std::mem::swap(&mut data, &mut self.data_buffers);
305            Ok(data)
306        };
307        // We don't really care if no one is around to receive it, just let
308        // the result go out of scope and get cleaned up
309        let response = Response {
310            data: result,
311            // Report 0 bytes for bypass tasks so the backpressure budget is unaffected
312            num_bytes: if self.bypass_backpressure {
313                0
314            } else {
315                self.num_bytes
316            },
317            priority: self.priority,
318            num_reqs: self.num_reqs,
319        };
320        (self.when_done.take().unwrap())(response);
321    }
322}
323
324struct DataChunk {
325    task_idx: usize,
326    num_bytes: u64,
327    data: Result<Bytes>,
328}
329
330trait DataSink: Send {
331    fn deliver_data(&mut self, data: DataChunk);
332}
333
334impl<F: FnOnce(Response) + Send> DataSink for MutableBatch<F> {
335    // Called by worker tasks to add data to the MutableBatch
336    fn deliver_data(&mut self, data: DataChunk) {
337        self.num_bytes += data.num_bytes;
338        match data.data {
339            Ok(data_bytes) => {
340                self.data_buffers[data.task_idx] = data_bytes;
341            }
342            Err(err) => {
343                // This keeps the original error, if present
344                self.err.get_or_insert(Box::new(err));
345            }
346        }
347    }
348}
349
350struct IoTask {
351    reader: Arc<dyn Reader>,
352    to_read: Range<u64>,
353    when_done: Box<dyn FnOnce(Result<Bytes>) + Send>,
354    priority: u128,
355    bypass_backpressure: bool,
356}
357
358impl Eq for IoTask {}
359
360impl PartialEq for IoTask {
361    fn eq(&self, other: &Self) -> bool {
362        self.bypass_backpressure == other.bypass_backpressure && self.priority == other.priority
363    }
364}
365
366impl PartialOrd for IoTask {
367    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
368        Some(self.cmp(other))
369    }
370}
371
372impl Ord for IoTask {
373    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
374        // Bypass tasks are always delivered before normal tasks.
375        // Within the same bypass class, this is a min-heap on priority.
376        self.bypass_backpressure
377            .cmp(&other.bypass_backpressure)
378            .then(other.priority.cmp(&self.priority))
379    }
380}
381
382impl IoTask {
383    fn num_bytes(&self) -> u64 {
384        self.to_read.end - self.to_read.start
385    }
386    fn cancel(self) {
387        (self.when_done)(Err(Error::internal(
388            "Scheduler closed before I/O was completed".to_string(),
389        )));
390    }
391
392    async fn run(self) {
393        let file_path = self.reader.path().as_ref();
394        let num_bytes = self.num_bytes();
395        let bytes = if self.to_read.start == self.to_read.end {
396            Ok(Bytes::new())
397        } else {
398            let bytes_fut = self
399                .reader
400                .get_range(self.to_read.start as usize..self.to_read.end as usize);
401            IOPS_COUNTER.fetch_add(1, Ordering::Release);
402            let num_bytes = self.num_bytes();
403            bytes_fut
404                .inspect(move |_| {
405                    BYTES_READ_COUNTER.fetch_add(num_bytes, Ordering::Release);
406                })
407                .await
408                .map_err(Error::from)
409        };
410        // Emit per-file I/O trace event only when tracing is enabled
411        tracing::trace!(
412            file = file_path,
413            bytes_read = num_bytes,
414            requests = 1,
415            range_start = self.to_read.start,
416            range_end = self.to_read.end,
417            "File I/O completed"
418        );
419        (self.when_done)(bytes);
420    }
421}
422
423// Every time a scheduler starts up it launches a task to run the I/O loop.  This loop
424// repeats endlessly until the scheduler is destroyed.
425async fn run_io_loop(tasks: Arc<IoQueue>) {
426    // Pop the first finished task off the queue and submit another until
427    // we are done
428    loop {
429        let next_task = tasks.pop().await;
430        match next_task {
431            Some(task) => {
432                tokio::spawn(task.run());
433            }
434            None => {
435                // The sender has been dropped, we are done
436                return;
437            }
438        }
439    }
440}
441
442#[derive(Debug)]
443struct StatsCollector {
444    iops: AtomicU64,
445    requests: AtomicU64,
446    bytes_read: AtomicU64,
447}
448
449impl StatsCollector {
450    fn new() -> Self {
451        Self {
452            iops: AtomicU64::new(0),
453            requests: AtomicU64::new(0),
454            bytes_read: AtomicU64::new(0),
455        }
456    }
457
458    fn iops(&self) -> u64 {
459        self.iops.load(Ordering::Relaxed)
460    }
461
462    fn bytes_read(&self) -> u64 {
463        self.bytes_read.load(Ordering::Relaxed)
464    }
465
466    fn requests(&self) -> u64 {
467        self.requests.load(Ordering::Relaxed)
468    }
469
470    fn record_request(&self, request: &[Range<u64>]) {
471        self.requests.fetch_add(1, Ordering::Relaxed);
472        self.iops.fetch_add(request.len() as u64, Ordering::Relaxed);
473        self.bytes_read.fetch_add(
474            request.iter().map(|r| r.end - r.start).sum::<u64>(),
475            Ordering::Relaxed,
476        );
477    }
478}
479
480pub struct ScanStats {
481    pub iops: u64,
482    pub requests: u64,
483    pub bytes_read: u64,
484}
485
486impl ScanStats {
487    fn new(stats: &StatsCollector) -> Self {
488        Self {
489            iops: stats.iops(),
490            requests: stats.requests(),
491            bytes_read: stats.bytes_read(),
492        }
493    }
494}
495
496enum IoQueueType {
497    Standard(Arc<IoQueue>),
498    Lite(Arc<lite::IoQueue>),
499}
500
501/// An I/O scheduler which wraps an ObjectStore and throttles the amount of
502/// parallel I/O that can be run.
503///
504/// The ScanScheduler will cancel any outstanding I/O requests when it is dropped.
505/// For this reason it should be kept alive until all I/O has finished.
506///
507/// Note: The 2.X file readers already do this so this is only a concern if you are
508/// using the ScanScheduler directly.
509pub struct ScanScheduler {
510    object_store: Arc<ObjectStore>,
511    io_queue: IoQueueType,
512    stats: Arc<StatsCollector>,
513}
514
515impl Debug for ScanScheduler {
516    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
517        f.debug_struct("ScanScheduler")
518            .field("object_store", &self.object_store)
519            .finish()
520    }
521}
522
523struct Response {
524    data: Result<Vec<Bytes>>,
525    priority: u128,
526    num_reqs: usize,
527    num_bytes: u64,
528}
529
530#[derive(Debug, Clone, Copy)]
531pub struct SchedulerConfig {
532    /// the # of bytes that can be buffered but not yet requested.
533    /// This controls back pressure.  If data is not processed quickly enough then this
534    /// buffer will fill up and the I/O loop will pause until the buffer is drained.
535    pub io_buffer_size_bytes: u64,
536    /// Whether to use the lite scheduler.
537    ///
538    /// - `Some(true)` forces the lite scheduler (e.g. from env var or programmatic).
539    /// - `Some(false)` forces the standard scheduler.
540    /// - `None` defers to the object store's preference (see [`ObjectStore::prefers_lite_scheduler`]).
541    pub use_lite_scheduler: Option<bool>,
542}
543
544impl SchedulerConfig {
545    pub fn new(io_buffer_size_bytes: u64) -> Self {
546        Self {
547            io_buffer_size_bytes,
548            use_lite_scheduler: std::env::var("LANCE_USE_LITE_SCHEDULER")
549                .ok()
550                .map(|v| str_is_truthy(v.trim())),
551        }
552    }
553
554    /// Big enough for unit testing
555    pub fn default_for_testing() -> Self {
556        Self {
557            io_buffer_size_bytes: 256 * 1024 * 1024,
558            use_lite_scheduler: None,
559        }
560    }
561
562    /// Configuration that should generally maximize bandwidth (not trying to save RAM
563    /// at all).  We assume a max page size of 32MiB and then allow 32MiB per I/O thread
564    pub fn max_bandwidth(store: &ObjectStore) -> Self {
565        Self::new(32 * 1024 * 1024 * store.io_parallelism() as u64)
566    }
567
568    pub fn with_lite_scheduler(self) -> Self {
569        Self {
570            use_lite_scheduler: Some(true),
571            ..self
572        }
573    }
574}
575
576impl ScanScheduler {
577    /// Create a new scheduler with the given I/O capacity
578    ///
579    /// # Arguments
580    ///
581    /// * object_store - the store to wrap
582    /// * config - configuration settings for the scheduler
583    pub fn new(object_store: Arc<ObjectStore>, config: SchedulerConfig) -> Arc<Self> {
584        let io_capacity = object_store.io_parallelism();
585        let use_lite = config
586            .use_lite_scheduler
587            .unwrap_or_else(|| object_store.prefers_lite_scheduler());
588        let io_queue = if use_lite {
589            let io_queue = Arc::new(lite::IoQueue::new(
590                io_capacity as u64,
591                config.io_buffer_size_bytes,
592            ));
593            IoQueueType::Lite(io_queue)
594        } else {
595            let io_queue = Arc::new(IoQueue::new(
596                io_capacity as u32,
597                config.io_buffer_size_bytes,
598            ));
599            let io_queue_clone = io_queue.clone();
600            // Best we can do here is fire and forget.  If the I/O loop is still running when the scheduler is
601            // dropped we can't wait for it to finish or we'd block a tokio thread.  We could spawn a blocking task
602            // to wait for it to finish but that doesn't seem helpful.
603            tokio::task::spawn(async move { run_io_loop(io_queue_clone).await });
604            IoQueueType::Standard(io_queue)
605        };
606        Arc::new(Self {
607            object_store,
608            io_queue,
609            stats: Arc::new(StatsCollector::new()),
610        })
611    }
612
613    /// Open a file for reading
614    ///
615    /// # Arguments
616    ///
617    /// * path - the path to the file to open
618    /// * base_priority - the base priority for I/O requests submitted to this file scheduler
619    ///   this will determine the upper 64 bits of priority (the lower 64 bits
620    ///   come from `submit_request` and `submit_single`)
621    pub async fn open_file_with_priority(
622        self: &Arc<Self>,
623        path: &Path,
624        base_priority: u64,
625        file_size_bytes: &CachedFileSize,
626    ) -> Result<FileScheduler> {
627        let file_size_bytes = if let Some(size) = file_size_bytes.get() {
628            u64::from(size)
629        } else {
630            let size = self.object_store.size(path).await?;
631            if let Some(size) = NonZero::new(size) {
632                file_size_bytes.set(size);
633            }
634            size
635        };
636        let reader = self
637            .object_store
638            .open_with_size(path, file_size_bytes as usize)
639            .await?;
640        let block_size = self.object_store.block_size() as u64;
641        let max_iop_size = self.object_store.max_iop_size();
642        Ok(FileScheduler {
643            reader: reader.into(),
644            block_size,
645            root: self.clone(),
646            base_priority,
647            max_iop_size,
648            bypass_backpressure: false,
649        })
650    }
651
652    /// Open a file with a default priority of 0
653    ///
654    /// See [`Self::open_file_with_priority`] for more information on the priority
655    pub async fn open_file(
656        self: &Arc<Self>,
657        path: &Path,
658        file_size_bytes: &CachedFileSize,
659    ) -> Result<FileScheduler> {
660        self.open_file_with_priority(path, 0, file_size_bytes).await
661    }
662
663    fn do_submit_request(
664        &self,
665        reader: Arc<dyn Reader>,
666        request: Vec<Range<u64>>,
667        tx: oneshot::Sender<Response>,
668        priority: u128,
669        io_queue: &Arc<IoQueue>,
670        bypass_backpressure: bool,
671    ) {
672        let num_iops = request.len() as u32;
673
674        let when_all_io_done = move |bytes_and_permits| {
675            // We don't care if the receiver has given up so discard the result
676            let _ = tx.send(bytes_and_permits);
677        };
678
679        let dest = Arc::new(Mutex::new(Box::new(MutableBatch::new(
680            when_all_io_done,
681            num_iops,
682            priority,
683            request.len(),
684            bypass_backpressure,
685        ))));
686
687        for (task_idx, iop) in request.into_iter().enumerate() {
688            let dest = dest.clone();
689            let io_queue_clone = io_queue.clone();
690            let num_bytes = iop.end - iop.start;
691            let task = IoTask {
692                reader: reader.clone(),
693                to_read: iop,
694                priority,
695                bypass_backpressure,
696                when_done: Box::new(move |data| {
697                    io_queue_clone.on_iop_complete();
698                    let mut dest = dest.lock().unwrap();
699                    let chunk = DataChunk {
700                        data,
701                        task_idx,
702                        num_bytes,
703                    };
704                    dest.deliver_data(chunk);
705                }),
706            };
707            io_queue.push(task);
708        }
709    }
710
711    fn submit_request_standard(
712        &self,
713        reader: Arc<dyn Reader>,
714        request: Vec<Range<u64>>,
715        priority: u128,
716        io_queue: &Arc<IoQueue>,
717        bypass_backpressure: bool,
718    ) -> impl Future<Output = Result<Vec<Bytes>>> + Send + use<> {
719        let (tx, rx) = oneshot::channel::<Response>();
720
721        self.do_submit_request(reader, request, tx, priority, io_queue, bypass_backpressure);
722
723        let io_queue_clone = io_queue.clone();
724
725        rx.map(move |wrapped_rsp| {
726            // Right now, it isn't possible for I/O to be cancelled so a cancel error should
727            // not occur
728            let rsp = wrapped_rsp.unwrap();
729            io_queue_clone.on_bytes_consumed(rsp.num_bytes, rsp.priority, rsp.num_reqs);
730            rsp.data
731        })
732    }
733
734    fn submit_request_lite(
735        &self,
736        reader: Arc<dyn Reader>,
737        request: Vec<Range<u64>>,
738        priority: u128,
739        io_queue: &Arc<lite::IoQueue>,
740        bypass_backpressure: bool,
741    ) -> impl Future<Output = Result<Vec<Bytes>>> + Send + use<> {
742        // It's important that we submit all requests _before_ we await anything
743        let maybe_tasks = request
744            .into_iter()
745            .map(|task| {
746                let reader = reader.clone();
747                let queue = io_queue.clone();
748                let run_fn = Box::new(move || {
749                    reader
750                        .get_range(task.start as usize..task.end as usize)
751                        .map_err(Error::from)
752                        .boxed()
753                });
754                queue.submit(task, priority, run_fn, bypass_backpressure)
755            })
756            .collect::<Result<Vec<_>>>();
757        match maybe_tasks {
758            Ok(tasks) => async move {
759                let mut results = Vec::with_capacity(tasks.len());
760                for task in tasks {
761                    results.push(task.await?);
762                }
763                Ok(results)
764            }
765            .boxed(),
766            Err(e) => async move { Err(e) }.boxed(),
767        }
768    }
769
770    pub fn submit_request(
771        &self,
772        reader: Arc<dyn Reader>,
773        request: Vec<Range<u64>>,
774        priority: u128,
775        bypass_backpressure: bool,
776    ) -> impl Future<Output = Result<Vec<Bytes>>> + Send + use<> {
777        match &self.io_queue {
778            IoQueueType::Standard(io_queue) => {
779                futures::future::Either::Left(self.submit_request_standard(
780                    reader,
781                    request,
782                    priority,
783                    io_queue,
784                    bypass_backpressure,
785                ))
786            }
787            IoQueueType::Lite(io_queue) => futures::future::Either::Right(
788                self.submit_request_lite(reader, request, priority, io_queue, bypass_backpressure),
789            ),
790        }
791    }
792
793    pub fn stats(&self) -> ScanStats {
794        ScanStats::new(self.stats.as_ref())
795    }
796
797    #[cfg(test)]
798    fn uses_lite_scheduler(&self) -> bool {
799        matches!(self.io_queue, IoQueueType::Lite(_))
800    }
801}
802
803impl Drop for ScanScheduler {
804    fn drop(&mut self) {
805        // If the user is dropping the ScanScheduler then they _should_ be done with I/O.  This can happen
806        // even when I/O is in progress if, for example, the user is dropping a scan mid-read because they found
807        // the data they wanted (limit after filter or some other example).
808        //
809        // Closing the I/O queue will cancel any requests that have not yet been sent to the I/O loop.  However,
810        // it will not terminate the I/O loop itself.  This is to help prevent deadlock and ensure that all I/O
811        // requests that are submitted will terminate.
812        //
813        // In theory, this isn't strictly necessary, as callers should drop any task expecting I/O before they
814        // drop the scheduler.  In practice, this can be difficult to do, and it is better to spend a little bit
815        // of time letting the I/O loop drain so that we can avoid any potential deadlocks.
816        match &self.io_queue {
817            IoQueueType::Standard(io_queue) => io_queue.close(),
818            IoQueueType::Lite(io_queue) => io_queue.close(),
819        }
820    }
821}
822
823/// A throttled file reader
824#[derive(Clone, Debug)]
825pub struct FileScheduler {
826    reader: Arc<dyn Reader>,
827    root: Arc<ScanScheduler>,
828    block_size: u64,
829    base_priority: u64,
830    max_iop_size: u64,
831    bypass_backpressure: bool,
832}
833
834fn is_close_together(range1: &Range<u64>, range2: &Range<u64>, block_size: u64) -> bool {
835    // Note that range1.end <= range2.start is possible (e.g. when decoding string arrays)
836    range2.start <= (range1.end + block_size)
837}
838
839fn is_overlapping(range1: &Range<u64>, range2: &Range<u64>) -> bool {
840    range1.start < range2.end && range2.start < range1.end
841}
842
843impl FileScheduler {
844    /// Submit a batch of I/O requests to the reader
845    ///
846    /// The requests will be queued in a FIFO manner and, when all requests
847    /// have been fulfilled, the returned future will be completed.
848    ///
849    /// Each request has a given priority.  If the I/O loop is full then requests
850    /// will be buffered and requests with the *lowest* priority will be released
851    /// from the buffer first.
852    ///
853    /// Each request has a backpressure ID which controls which backpressure throttle
854    /// is applied to the request.  Requests made to the same backpressure throttle
855    /// will be throttled together.
856    pub fn submit_request(
857        &self,
858        request: Vec<Range<u64>>,
859        priority: u64,
860    ) -> impl Future<Output = Result<Vec<Bytes>>> + Send + use<> {
861        // The final priority is a combination of the row offset and the file number
862        let priority = ((self.base_priority as u128) << 64) + priority as u128;
863
864        let mut merged_requests = Vec::with_capacity(request.len());
865
866        if !request.is_empty() {
867            let mut curr_interval = request[0].clone();
868
869            for req in request.iter().skip(1) {
870                if is_close_together(&curr_interval, req, self.block_size) {
871                    curr_interval.end = curr_interval.end.max(req.end);
872                } else {
873                    merged_requests.push(curr_interval);
874                    curr_interval = req.clone();
875                }
876            }
877
878            merged_requests.push(curr_interval);
879        }
880
881        let mut updated_requests = Vec::with_capacity(merged_requests.len());
882        for req in merged_requests {
883            if req.is_empty() {
884                updated_requests.push(req);
885            } else {
886                let num_requests = (req.end - req.start).div_ceil(self.max_iop_size);
887                let bytes_per_request = (req.end - req.start) / num_requests;
888                for i in 0..num_requests {
889                    let start = req.start + i * bytes_per_request;
890                    let end = if i == num_requests - 1 {
891                        // Last request is a bit bigger due to rounding
892                        req.end
893                    } else {
894                        start + bytes_per_request
895                    };
896                    updated_requests.push(start..end);
897                }
898            }
899        }
900
901        self.root.stats.record_request(&updated_requests);
902
903        let bytes_vec_fut = self.root.submit_request(
904            self.reader.clone(),
905            updated_requests.clone(),
906            priority,
907            self.bypass_backpressure,
908        );
909
910        let mut updated_index = 0;
911        let mut final_bytes = Vec::with_capacity(request.len());
912
913        async move {
914            let bytes_vec = bytes_vec_fut.await?;
915
916            let mut orig_index = 0;
917            while (updated_index < updated_requests.len()) && (orig_index < request.len()) {
918                let updated_range = &updated_requests[updated_index];
919                let orig_range = &request[orig_index];
920                let byte_offset = updated_range.start as usize;
921
922                if is_overlapping(updated_range, orig_range) {
923                    // We need to undo the coalescing and splitting done earlier
924                    let start = orig_range.start as usize - byte_offset;
925                    if orig_range.end <= updated_range.end {
926                        // The original range is fully contained in the updated range, can do
927                        // zero-copy slice
928                        let end = orig_range.end as usize - byte_offset;
929                        final_bytes.push(bytes_vec[updated_index].slice(start..end));
930                    } else {
931                        // The original read was split into multiple requests, need to copy
932                        // back into a single buffer
933                        let orig_size = orig_range.end - orig_range.start;
934                        let mut merged_bytes = Vec::with_capacity(orig_size as usize);
935                        merged_bytes.extend_from_slice(&bytes_vec[updated_index].slice(start..));
936                        let mut copy_offset = merged_bytes.len() as u64;
937                        while copy_offset < orig_size {
938                            updated_index += 1;
939                            let next_range = &updated_requests[updated_index];
940                            let bytes_to_take =
941                                (orig_size - copy_offset).min(next_range.end - next_range.start);
942                            merged_bytes.extend_from_slice(
943                                &bytes_vec[updated_index].slice(0..bytes_to_take as usize),
944                            );
945                            copy_offset += bytes_to_take;
946                        }
947                        final_bytes.push(Bytes::from(merged_bytes));
948                    }
949                    orig_index += 1;
950                } else {
951                    updated_index += 1;
952                }
953            }
954
955            Ok(final_bytes)
956        }
957    }
958
959    pub fn with_priority(&self, priority: u64) -> Self {
960        Self {
961            reader: self.reader.clone(),
962            root: self.root.clone(),
963            block_size: self.block_size,
964            max_iop_size: self.max_iop_size,
965            base_priority: priority,
966            bypass_backpressure: self.bypass_backpressure,
967        }
968    }
969
970    /// Returns a copy of this scheduler that bypasses backpressure for all requests.
971    ///
972    /// This should be used for indirect I/O (e.g. fetching items after decoding offsets) where
973    /// blocking on backpressure could cause a deadlock or excessive latency.
974    pub fn with_bypass_backpressure(&self) -> Self {
975        Self {
976            bypass_backpressure: true,
977            ..self.clone()
978        }
979    }
980
981    /// Submit a single IOP to the reader
982    ///
983    /// If you have multiple IOPS to perform then [`Self::submit_request`] is going
984    /// to be more efficient.
985    ///
986    /// See [`Self::submit_request`] for more information on the priority and backpressure.
987    pub fn submit_single(
988        &self,
989        range: Range<u64>,
990        priority: u64,
991    ) -> impl Future<Output = Result<Bytes>> + Send {
992        self.submit_request(vec![range], priority)
993            .map_ok(|vec_bytes| vec_bytes.into_iter().next().unwrap())
994    }
995
996    /// Provides access to the underlying reader
997    ///
998    /// Do not use this for reading data as it will bypass any I/O scheduling!
999    /// This is mainly exposed to allow metadata operations (e.g size, block_size,)
1000    /// which either aren't IOPS or we don't throttle
1001    pub fn reader(&self) -> &Arc<dyn Reader> {
1002        &self.reader
1003    }
1004}
1005
1006#[cfg(test)]
1007mod tests {
1008    use std::{collections::VecDeque, time::Duration};
1009
1010    use futures::poll;
1011    use lance_core::utils::tempfile::TempObjFile;
1012    use rand::RngCore;
1013
1014    use object_store::{GetRange, ObjectStore as OSObjectStore, memory::InMemory};
1015    use tokio::{runtime::Handle, time::timeout};
1016    use url::Url;
1017
1018    use crate::{
1019        object_store::{DEFAULT_DOWNLOAD_RETRY_COUNT, DEFAULT_MAX_IOP_SIZE},
1020        testing::MockObjectStore,
1021    };
1022
1023    use super::*;
1024
1025    fn make_task(priority: u128, bypass_backpressure: bool) -> IoTask {
1026        IoTask {
1027            reader: Arc::new(TrackingReader {
1028                get_range_count: Arc::new(AtomicU64::new(0)),
1029                path: Path::parse("test").unwrap(),
1030            }),
1031            to_read: 0..1,
1032            when_done: Box::new(|_| {}),
1033            priority,
1034            bypass_backpressure,
1035        }
1036    }
1037
1038    #[test]
1039    fn test_iotask_ordering() {
1040        // Bypass tasks must come out of the heap before non-bypass tasks.
1041        // Within each group, lower priority number (= higher priority) comes first.
1042        let mut heap = BinaryHeap::new();
1043        heap.push(make_task(10, false)); // non-bypass, low priority
1044        heap.push(make_task(1, false)); // non-bypass, high priority
1045        heap.push(make_task(20, true)); // bypass, low priority
1046        heap.push(make_task(5, true)); // bypass, high priority
1047
1048        let order: Vec<(u128, bool)> = std::iter::from_fn(|| heap.pop())
1049            .map(|t| (t.priority, t.bypass_backpressure))
1050            .collect();
1051
1052        assert_eq!(order, vec![(5, true), (20, true), (1, false), (10, false)]);
1053    }
1054
1055    #[tokio::test]
1056    async fn test_full_seq_read() {
1057        let tmp_file = TempObjFile::default();
1058
1059        let obj_store = Arc::new(ObjectStore::local());
1060
1061        // Write 1MiB of data
1062        const DATA_SIZE: u64 = 1024 * 1024;
1063        let mut some_data = vec![0; DATA_SIZE as usize];
1064        rand::rng().fill_bytes(&mut some_data);
1065        obj_store.put(&tmp_file, &some_data).await.unwrap();
1066
1067        let config = SchedulerConfig::default_for_testing();
1068
1069        let scheduler = ScanScheduler::new(obj_store, config);
1070
1071        let file_scheduler = scheduler
1072            .open_file(&tmp_file, &CachedFileSize::unknown())
1073            .await
1074            .unwrap();
1075
1076        // Read it back 4KiB at a time
1077        const READ_SIZE: u64 = 4 * 1024;
1078        let mut reqs = VecDeque::new();
1079        let mut offset = 0;
1080        while offset < DATA_SIZE {
1081            reqs.push_back(
1082                #[allow(clippy::single_range_in_vec_init)]
1083                file_scheduler
1084                    .submit_request(vec![offset..offset + READ_SIZE], 0)
1085                    .await
1086                    .unwrap(),
1087            );
1088            offset += READ_SIZE;
1089        }
1090
1091        offset = 0;
1092        // Note: we should get parallel I/O even though we are consuming serially
1093        while offset < DATA_SIZE {
1094            let data = reqs.pop_front().unwrap();
1095            let actual = &data[0];
1096            let expected = &some_data[offset as usize..(offset + READ_SIZE) as usize];
1097            assert_eq!(expected, actual);
1098            offset += READ_SIZE;
1099        }
1100    }
1101
1102    #[tokio::test]
1103    async fn test_split_coalesce() {
1104        let tmp_file = TempObjFile::default();
1105
1106        let obj_store = Arc::new(ObjectStore::local());
1107
1108        // Write 75MiB of data
1109        const DATA_SIZE: u64 = 75 * 1024 * 1024;
1110        let mut some_data = vec![0; DATA_SIZE as usize];
1111        rand::rng().fill_bytes(&mut some_data);
1112        obj_store.put(&tmp_file, &some_data).await.unwrap();
1113
1114        let config = SchedulerConfig::default_for_testing();
1115
1116        let scheduler = ScanScheduler::new(obj_store, config);
1117
1118        let file_scheduler = scheduler
1119            .open_file(&tmp_file, &CachedFileSize::unknown())
1120            .await
1121            .unwrap();
1122
1123        // These 3 requests should be coalesced into a single I/O because they are within 4KiB
1124        // of each other
1125        let req =
1126            file_scheduler.submit_request(vec![50_000..51_000, 52_000..53_000, 54_000..55_000], 0);
1127
1128        let bytes = req.await.unwrap();
1129
1130        assert_eq!(bytes[0], &some_data[50_000..51_000]);
1131        assert_eq!(bytes[1], &some_data[52_000..53_000]);
1132        assert_eq!(bytes[2], &some_data[54_000..55_000]);
1133
1134        assert_eq!(1, scheduler.stats().iops);
1135
1136        // This should be split into 5 requests because it is so large
1137        let req = file_scheduler.submit_request(vec![0..DATA_SIZE], 0);
1138        let bytes = req.await.unwrap();
1139        assert!(bytes[0] == some_data, "data is not the same");
1140
1141        assert_eq!(6, scheduler.stats().iops);
1142
1143        // None of these requests are bigger than the max IOP size but they will be coalesced into
1144        // one IOP that is bigger and then split back into 2 requests that don't quite align with the original
1145        // ranges.
1146        let chunk_size = *DEFAULT_MAX_IOP_SIZE;
1147        let req = file_scheduler.submit_request(
1148            vec![
1149                10..chunk_size,
1150                chunk_size + 10..(chunk_size * 2) - 20,
1151                chunk_size * 2..(chunk_size * 2) + 10,
1152            ],
1153            0,
1154        );
1155
1156        let bytes = req.await.unwrap();
1157        let chunk_size = chunk_size as usize;
1158        assert!(
1159            bytes[0] == some_data[10..chunk_size],
1160            "data is not the same"
1161        );
1162        assert!(
1163            bytes[1] == some_data[chunk_size + 10..(chunk_size * 2) - 20],
1164            "data is not the same"
1165        );
1166        assert!(
1167            bytes[2] == some_data[chunk_size * 2..(chunk_size * 2) + 10],
1168            "data is not the same"
1169        );
1170        assert_eq!(8, scheduler.stats().iops);
1171
1172        let reads = (0..44)
1173            .map(|i| i * 1_000_000..(i + 1) * 1_000_000)
1174            .collect::<Vec<_>>();
1175        let req = file_scheduler.submit_request(reads, 0);
1176        let bytes = req.await.unwrap();
1177        for (i, bytes) in bytes.iter().enumerate() {
1178            assert!(
1179                bytes == &some_data[i * 1_000_000..(i + 1) * 1_000_000],
1180                "data is not the same"
1181            );
1182        }
1183        assert_eq!(11, scheduler.stats().iops);
1184    }
1185
1186    #[tokio::test]
1187    async fn test_priority() {
1188        let some_path = Path::parse("foo").unwrap();
1189        let base_store = Arc::new(InMemory::new());
1190        base_store
1191            .put(&some_path, vec![0; 1000].into())
1192            .await
1193            .unwrap();
1194
1195        let semaphore = Arc::new(tokio::sync::Semaphore::new(0));
1196        let mut obj_store = MockObjectStore::default();
1197        let semaphore_copy = semaphore.clone();
1198        obj_store
1199            .expect_get_opts()
1200            .returning(move |location, options| {
1201                let semaphore = semaphore.clone();
1202                let base_store = base_store.clone();
1203                let location = location.clone();
1204                async move {
1205                    semaphore.acquire().await.unwrap().forget();
1206                    base_store.get_opts(&location, options).await
1207                }
1208                .boxed()
1209            });
1210        let obj_store = Arc::new(ObjectStore::new(
1211            Arc::new(obj_store),
1212            Url::parse("mem://").unwrap(),
1213            Some(500),
1214            None,
1215            false,
1216            false,
1217            1,
1218            DEFAULT_DOWNLOAD_RETRY_COUNT,
1219            None,
1220        ));
1221
1222        let config = SchedulerConfig {
1223            io_buffer_size_bytes: 1024 * 1024,
1224            use_lite_scheduler: None,
1225        };
1226
1227        let scan_scheduler = ScanScheduler::new(obj_store, config);
1228
1229        let file_scheduler = scan_scheduler
1230            .open_file(&Path::parse("foo").unwrap(), &CachedFileSize::new(1000))
1231            .await
1232            .unwrap();
1233
1234        // Issue a request, priority doesn't matter, it will be submitted
1235        // immediately (it will go pending)
1236        // Note: the timeout is to prevent a deadlock if the test fails.
1237        let first_fut = timeout(
1238            Duration::from_secs(10),
1239            file_scheduler.submit_single(0..10, 0),
1240        )
1241        .boxed();
1242
1243        // Issue another low priority request (it will go in queue)
1244        let mut second_fut = timeout(
1245            Duration::from_secs(10),
1246            file_scheduler.submit_single(0..20, 100),
1247        )
1248        .boxed();
1249
1250        // Issue a high priority request (it will go in queue and should bump
1251        // the other queued request down)
1252        let mut third_fut = timeout(
1253            Duration::from_secs(10),
1254            file_scheduler.submit_single(0..30, 0),
1255        )
1256        .boxed();
1257
1258        // Finish one file, should be the in-flight first request
1259        semaphore_copy.add_permits(1);
1260        assert!(first_fut.await.unwrap().unwrap().len() == 10);
1261        // Other requests should not be finished
1262        assert!(poll!(&mut second_fut).is_pending());
1263        assert!(poll!(&mut third_fut).is_pending());
1264
1265        // Next should be high priority request
1266        semaphore_copy.add_permits(1);
1267        assert!(third_fut.await.unwrap().unwrap().len() == 30);
1268        assert!(poll!(&mut second_fut).is_pending());
1269
1270        // Finally, the low priority request
1271        semaphore_copy.add_permits(1);
1272        assert!(second_fut.await.unwrap().unwrap().len() == 20);
1273    }
1274
1275    #[tokio::test(flavor = "multi_thread")]
1276    async fn test_backpressure() {
1277        let some_path = Path::parse("foo").unwrap();
1278        let base_store = Arc::new(InMemory::new());
1279        base_store
1280            .put(&some_path, vec![0; 100000].into())
1281            .await
1282            .unwrap();
1283
1284        let bytes_read = Arc::new(AtomicU64::from(0));
1285        let mut obj_store = MockObjectStore::default();
1286        let bytes_read_copy = bytes_read.clone();
1287        // Wraps the obj_store to keep track of how many bytes have been read
1288        obj_store
1289            .expect_get_opts()
1290            .returning(move |location, options| {
1291                let range = options.range.as_ref().unwrap();
1292                let num_bytes = match range {
1293                    GetRange::Bounded(bounded) => bounded.end - bounded.start,
1294                    _ => panic!(),
1295                };
1296                bytes_read_copy.fetch_add(num_bytes, Ordering::Release);
1297                let location = location.clone();
1298                let base_store = base_store.clone();
1299                async move { base_store.get_opts(&location, options).await }.boxed()
1300            });
1301        let obj_store = Arc::new(ObjectStore::new(
1302            Arc::new(obj_store),
1303            Url::parse("mem://").unwrap(),
1304            Some(500),
1305            None,
1306            false,
1307            false,
1308            1,
1309            DEFAULT_DOWNLOAD_RETRY_COUNT,
1310            None,
1311        ));
1312
1313        let config = SchedulerConfig {
1314            io_buffer_size_bytes: 10,
1315            use_lite_scheduler: None,
1316        };
1317
1318        let scan_scheduler = ScanScheduler::new(obj_store.clone(), config);
1319
1320        let file_scheduler = scan_scheduler
1321            .open_file(&Path::parse("foo").unwrap(), &CachedFileSize::new(100000))
1322            .await
1323            .unwrap();
1324
1325        let wait_for_idle = || async move {
1326            let handle = Handle::current();
1327            while handle.metrics().num_alive_tasks() != 1 {
1328                tokio::time::sleep(Duration::from_millis(10)).await;
1329            }
1330        };
1331        let wait_for_bytes_read_and_idle = |target_bytes: u64| {
1332            // We need to move `target` but don't want to move `bytes_read`
1333            let bytes_read = &bytes_read;
1334            async move {
1335                let bytes_read_copy = bytes_read.clone();
1336                while bytes_read_copy.load(Ordering::Acquire) < target_bytes {
1337                    tokio::time::sleep(Duration::from_millis(10)).await;
1338                }
1339                wait_for_idle().await;
1340            }
1341        };
1342
1343        // This read will begin immediately
1344        let first_fut = file_scheduler.submit_single(0..5, 0);
1345        // This read should also begin immediately
1346        let second_fut = file_scheduler.submit_single(0..5, 0);
1347        // This read will be throttled
1348        let third_fut = file_scheduler.submit_single(0..3, 0);
1349        // Two tasks (third_fut and unit test)
1350        wait_for_bytes_read_and_idle(10).await;
1351
1352        assert_eq!(first_fut.await.unwrap().len(), 5);
1353        // One task (unit test)
1354        wait_for_bytes_read_and_idle(13).await;
1355
1356        // 2 bytes are ready but 5 bytes requested, read will be blocked
1357        let fourth_fut = file_scheduler.submit_single(0..5, 0);
1358        wait_for_bytes_read_and_idle(13).await;
1359
1360        // Out of order completion is ok, will unblock backpressure
1361        assert_eq!(third_fut.await.unwrap().len(), 3);
1362        wait_for_bytes_read_and_idle(18).await;
1363
1364        assert_eq!(second_fut.await.unwrap().len(), 5);
1365        // At this point there are 5 bytes available in backpressure queue
1366        // Now we issue multi-read that can be partially fulfilled, it will read some bytes but
1367        // not all of them. (using large range gap to ensure request not coalesced)
1368        //
1369        // I'm actually not sure this behavior is great.  It's possible that we should just
1370        // block until we can fulfill the entire request.
1371        let fifth_fut = file_scheduler.submit_request(vec![0..3, 90000..90007], 0);
1372        wait_for_bytes_read_and_idle(21).await;
1373
1374        // Fifth future should eventually finish due to deadlock prevention
1375        let fifth_bytes = tokio::time::timeout(Duration::from_secs(10), fifth_fut)
1376            .await
1377            .unwrap();
1378        assert_eq!(
1379            fifth_bytes.unwrap().iter().map(|b| b.len()).sum::<usize>(),
1380            10
1381        );
1382
1383        // And now let's just make sure that we can read the rest of the data
1384        assert_eq!(fourth_fut.await.unwrap().len(), 5);
1385        wait_for_bytes_read_and_idle(28).await;
1386
1387        // Ensure deadlock prevention timeout can be disabled
1388        let config = SchedulerConfig {
1389            io_buffer_size_bytes: 10,
1390            use_lite_scheduler: None,
1391        };
1392
1393        let scan_scheduler = ScanScheduler::new(obj_store, config);
1394        let file_scheduler = scan_scheduler
1395            .open_file(&Path::parse("foo").unwrap(), &CachedFileSize::new(100000))
1396            .await
1397            .unwrap();
1398
1399        let first_fut = file_scheduler.submit_single(0..10, 0);
1400        let second_fut = file_scheduler.submit_single(0..10, 0);
1401
1402        std::thread::sleep(Duration::from_millis(100));
1403        assert_eq!(first_fut.await.unwrap().len(), 10);
1404        assert_eq!(second_fut.await.unwrap().len(), 10);
1405    }
1406
1407    /// A Reader that tracks how many times get_range has been called.
1408    #[derive(Debug)]
1409    struct TrackingReader {
1410        get_range_count: Arc<AtomicU64>,
1411        path: Path,
1412    }
1413
1414    impl deepsize::DeepSizeOf for TrackingReader {
1415        fn deep_size_of_children(&self, _context: &mut deepsize::Context) -> usize {
1416            0
1417        }
1418    }
1419
1420    impl Reader for TrackingReader {
1421        fn path(&self) -> &Path {
1422            &self.path
1423        }
1424
1425        fn block_size(&self) -> usize {
1426            4096
1427        }
1428
1429        fn io_parallelism(&self) -> usize {
1430            1
1431        }
1432
1433        fn size(&self) -> futures::future::BoxFuture<'_, object_store::Result<usize>> {
1434            Box::pin(async { Ok(1_000_000) })
1435        }
1436
1437        fn get_range(
1438            &self,
1439            range: Range<usize>,
1440        ) -> futures::future::BoxFuture<'static, object_store::Result<Bytes>> {
1441            self.get_range_count.fetch_add(1, Ordering::Release);
1442            let num_bytes = range.end - range.start;
1443            Box::pin(async move { Ok(Bytes::from(vec![0u8; num_bytes])) })
1444        }
1445
1446        fn get_all(&self) -> futures::future::BoxFuture<'_, object_store::Result<Bytes>> {
1447            Box::pin(async { Ok(Bytes::from(vec![0u8; 1_000_000])) })
1448        }
1449    }
1450
1451    #[tokio::test]
1452    async fn test_lite_scheduler_submits_eagerly() {
1453        let obj_store = Arc::new(ObjectStore::memory());
1454        let config = SchedulerConfig::default_for_testing().with_lite_scheduler();
1455        let scheduler = ScanScheduler::new(obj_store, config);
1456
1457        let get_range_count = Arc::new(AtomicU64::new(0));
1458        let reader: Arc<dyn Reader> = Arc::new(TrackingReader {
1459            get_range_count: get_range_count.clone(),
1460            path: Path::parse("test").unwrap(),
1461        });
1462
1463        // Submit several requests. The lite scheduler should call get_range
1464        // eagerly during submit (before the returned future is polled).
1465        let fut1 = scheduler.submit_request(reader.clone(), vec![0..100], 0, false);
1466        let fut2 = scheduler.submit_request(reader.clone(), vec![100..200], 10, false);
1467        let fut3 = scheduler.submit_request(reader.clone(), vec![200..300], 20, false);
1468
1469        // get_range must have been called for all 3 requests already.
1470        assert_eq!(get_range_count.load(Ordering::Acquire), 3);
1471
1472        // The futures should still resolve with the correct data.
1473        assert_eq!(fut1.await.unwrap()[0].len(), 100);
1474        assert_eq!(fut2.await.unwrap()[0].len(), 100);
1475        assert_eq!(fut3.await.unwrap()[0].len(), 100);
1476    }
1477
1478    #[tokio::test]
1479    async fn test_object_store_selects_scheduler() {
1480        // A memory:// store should use the standard scheduler when config is None
1481        let memory_store = Arc::new(ObjectStore::memory());
1482        assert!(!memory_store.prefers_lite_scheduler());
1483        let config = SchedulerConfig {
1484            io_buffer_size_bytes: 256 * 1024 * 1024,
1485            use_lite_scheduler: None,
1486        };
1487        let scheduler = ScanScheduler::new(memory_store.clone(), config);
1488        assert!(!scheduler.uses_lite_scheduler());
1489
1490        // A file+uring:// store should use the lite scheduler when config is None
1491        let uring_store = Arc::new(ObjectStore::new(
1492            Arc::new(InMemory::new()),
1493            Url::parse("file+uring:///tmp").unwrap(),
1494            None,
1495            None,
1496            false,
1497            false,
1498            8,
1499            DEFAULT_DOWNLOAD_RETRY_COUNT,
1500            None,
1501        ));
1502        assert!(uring_store.prefers_lite_scheduler());
1503        let config = SchedulerConfig {
1504            io_buffer_size_bytes: 256 * 1024 * 1024,
1505            use_lite_scheduler: None,
1506        };
1507        let scheduler = ScanScheduler::new(uring_store.clone(), config);
1508        assert!(scheduler.uses_lite_scheduler());
1509
1510        // Explicit Some(false) overrides a file+uring:// store's preference
1511        let config = SchedulerConfig {
1512            io_buffer_size_bytes: 256 * 1024 * 1024,
1513            use_lite_scheduler: Some(false),
1514        };
1515        let scheduler = ScanScheduler::new(uring_store, config);
1516        assert!(!scheduler.uses_lite_scheduler());
1517
1518        // Explicit Some(true) overrides a memory:// store's preference
1519        let config = SchedulerConfig {
1520            io_buffer_size_bytes: 256 * 1024 * 1024,
1521            use_lite_scheduler: Some(true),
1522        };
1523        let scheduler = ScanScheduler::new(memory_store, config);
1524        assert!(scheduler.uses_lite_scheduler());
1525    }
1526
1527    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1528    async fn stress_backpressure() {
1529        // This test ensures that the backpressure mechanism works correctly with
1530        // regards to priority.  In other words, as long as all requests are consumed
1531        // in priority order then the backpressure mechanism should not deadlock
1532        let some_path = Path::parse("foo").unwrap();
1533        let obj_store = Arc::new(ObjectStore::memory());
1534        obj_store
1535            .put(&some_path, vec![0; 100000].as_slice())
1536            .await
1537            .unwrap();
1538
1539        // Only one request will be allowed in
1540        let config = SchedulerConfig {
1541            io_buffer_size_bytes: 1,
1542            use_lite_scheduler: None,
1543        };
1544        let scan_scheduler = ScanScheduler::new(obj_store.clone(), config);
1545        let file_scheduler = scan_scheduler
1546            .open_file(&some_path, &CachedFileSize::unknown())
1547            .await
1548            .unwrap();
1549
1550        let mut futs = Vec::with_capacity(10000);
1551        for idx in 0..10000 {
1552            futs.push(file_scheduler.submit_single(idx..idx + 1, idx));
1553        }
1554
1555        for fut in futs {
1556            fut.await.unwrap();
1557        }
1558    }
1559
1560    #[tokio::test(flavor = "multi_thread")]
1561    async fn test_zero_buffer_size_no_backpressure() {
1562        // With io_buffer_size_bytes=0 (no_backpressure=true), reads at any priority go
1563        // through without blocking, even though a zero budget would normally halt all I/O.
1564        let obj_store = Arc::new(ObjectStore::memory());
1565        let config = SchedulerConfig {
1566            io_buffer_size_bytes: 0,
1567            use_lite_scheduler: Some(false),
1568        };
1569        let scheduler = ScanScheduler::new(obj_store, config);
1570
1571        let get_range_count = Arc::new(AtomicU64::new(0));
1572        let reader: Arc<dyn Reader> = Arc::new(TrackingReader {
1573            get_range_count: get_range_count.clone(),
1574            path: Path::parse("test").unwrap(),
1575        });
1576
1577        // Submit three reads at increasing priorities without awaiting any first.
1578        // Priority 1 and 2 would deadlock under a real 0-byte budget without no_backpressure.
1579        let fut1 = scheduler.submit_request(reader.clone(), vec![0..1000], 0, false);
1580        let fut2 = scheduler.submit_request(reader.clone(), vec![1000..2000], 1, false);
1581        let fut3 = scheduler.submit_request(reader.clone(), vec![2000..3000], 2, false);
1582
1583        let bytes1 = timeout(Duration::from_secs(5), fut1)
1584            .await
1585            .unwrap()
1586            .unwrap();
1587        let bytes2 = timeout(Duration::from_secs(5), fut2)
1588            .await
1589            .unwrap()
1590            .unwrap();
1591        let bytes3 = timeout(Duration::from_secs(5), fut3)
1592            .await
1593            .unwrap()
1594            .unwrap();
1595        assert_eq!(bytes1[0].len(), 1000);
1596        assert_eq!(bytes2[0].len(), 1000);
1597        assert_eq!(bytes3[0].len(), 1000);
1598        assert_eq!(get_range_count.load(Ordering::Acquire), 3);
1599    }
1600
1601    #[tokio::test(flavor = "multi_thread")]
1602    async fn test_file_scheduler_bypass_backpressure() {
1603        // A FileScheduler obtained via with_bypass_backpressure() submits reads that bypass
1604        // the byte budget, allowing them to proceed even when the budget is exhausted.
1605        let some_path = Path::parse("foo").unwrap();
1606        let base_store = Arc::new(InMemory::new());
1607        base_store
1608            .put(&some_path, vec![0u8; 1000].into())
1609            .await
1610            .unwrap();
1611
1612        let bytes_dispatched = Arc::new(AtomicU64::from(0));
1613        let mut obj_store = MockObjectStore::default();
1614        let bytes_dispatched_copy = bytes_dispatched.clone();
1615        obj_store
1616            .expect_get_opts()
1617            .returning(move |location, options| {
1618                let range = options.range.as_ref().unwrap();
1619                let num_bytes = match range {
1620                    GetRange::Bounded(bounded) => bounded.end - bounded.start,
1621                    _ => panic!(),
1622                };
1623                bytes_dispatched_copy.fetch_add(num_bytes, Ordering::Release);
1624                let location = location.clone();
1625                let base_store = base_store.clone();
1626                async move { base_store.get_opts(&location, options).await }.boxed()
1627            });
1628        let obj_store = Arc::new(ObjectStore::new(
1629            Arc::new(obj_store),
1630            Url::parse("mem://").unwrap(),
1631            Some(500),
1632            None,
1633            false,
1634            false,
1635            1,
1636            DEFAULT_DOWNLOAD_RETRY_COUNT,
1637            None,
1638        ));
1639
1640        // Budget = 10 bytes.
1641        let config = SchedulerConfig {
1642            io_buffer_size_bytes: 10,
1643            use_lite_scheduler: Some(false),
1644        };
1645        let scan_scheduler = ScanScheduler::new(obj_store, config);
1646        let file_scheduler = scan_scheduler
1647            .open_file(&Path::parse("foo").unwrap(), &CachedFileSize::new(1000))
1648            .await
1649            .unwrap();
1650        let bypass_scheduler = file_scheduler.with_bypass_backpressure();
1651
1652        // Fill the 10-byte budget with a priority-0 read.
1653        let blocker_fut = file_scheduler.submit_single(0..10, 0);
1654        while bytes_dispatched.load(Ordering::Acquire) < 10 {
1655            tokio::time::sleep(Duration::from_millis(1)).await;
1656        }
1657
1658        // A normal read at priority 2 is blocked: budget = 0, priority 2 > min-in-flight 0.
1659        // A bypass read at priority 1 (higher priority in the queue) bypasses the budget check.
1660        let normal_fut = file_scheduler.submit_single(0..10, 2);
1661        let bypass_fut = bypass_scheduler.submit_single(0..10, 1);
1662
1663        // Bypass read is dispatched; normal read is still blocked.
1664        while bytes_dispatched.load(Ordering::Acquire) < 20 {
1665            tokio::time::sleep(Duration::from_millis(1)).await;
1666        }
1667        tokio::time::sleep(Duration::from_millis(20)).await;
1668        assert_eq!(
1669            bytes_dispatched.load(Ordering::Acquire),
1670            20,
1671            "normal read should still be blocked while budget is exhausted"
1672        );
1673
1674        // Consuming the blocker releases its 10-byte budget → normal read can proceed.
1675        timeout(Duration::from_secs(5), blocker_fut)
1676            .await
1677            .unwrap()
1678            .unwrap();
1679        timeout(Duration::from_secs(5), bypass_fut)
1680            .await
1681            .unwrap()
1682            .unwrap();
1683        timeout(Duration::from_secs(5), normal_fut)
1684            .await
1685            .unwrap()
1686            .unwrap();
1687        assert_eq!(bytes_dispatched.load(Ordering::Acquire), 30);
1688    }
1689}