Skip to main content

lance_io/
scheduler.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use bytes::Bytes;
5use futures::channel::oneshot;
6use futures::{FutureExt, TryFutureExt};
7use object_store::path::Path;
8use std::collections::BinaryHeap;
9use std::fmt::Debug;
10use std::future::Future;
11use std::num::NonZero;
12use std::ops::Range;
13use std::sync::atomic::{AtomicU64, Ordering};
14use std::sync::{Arc, Mutex};
15use std::time::Instant;
16use tokio::sync::Notify;
17
18use lance_core::utils::io_stats::IoStatsRecorder;
19use lance_core::utils::parse::str_is_truthy;
20use lance_core::{Error, Result};
21
22use crate::object_store::ObjectStore;
23use crate::traits::Reader;
24use crate::utils::CachedFileSize;
25
26mod lite;
27
28// Don't log backpressure warnings until at least this many seconds have passed
29const BACKPRESSURE_MIN: u64 = 5;
30// Don't log backpressure warnings more than once / minute
31const BACKPRESSURE_DEBOUNCE: u64 = 60;
32
33// Global counter of how many IOPS we have issued
34static IOPS_COUNTER: AtomicU64 = AtomicU64::new(0);
35// Global counter of how many bytes were read by the scheduler
36static BYTES_READ_COUNTER: AtomicU64 = AtomicU64::new(0);
37
38pub fn iops_counter() -> u64 {
39    IOPS_COUNTER.load(Ordering::Acquire)
40}
41
42pub fn bytes_read_counter() -> u64 {
43    BYTES_READ_COUNTER.load(Ordering::Acquire)
44}
45
46// We want to allow requests that have a lower priority than any
47// currently in-flight request.  This helps avoid potential deadlocks
48// related to backpressure.  Unfortunately, it is quite expensive to
49// keep track of which priorities are in-flight.
50//
51// TODO: At some point it would be nice if we can optimize this away but
52// in_flight should remain relatively small (generally less than 256 items)
53// and has not shown itself to be a bottleneck yet.
54struct PrioritiesInFlight {
55    in_flight: Vec<u128>,
56}
57
58impl PrioritiesInFlight {
59    fn new(capacity: u32) -> Self {
60        Self {
61            in_flight: Vec::with_capacity(capacity as usize * 2),
62        }
63    }
64
65    fn min_in_flight(&self) -> u128 {
66        self.in_flight.first().copied().unwrap_or(u128::MAX)
67    }
68
69    fn push(&mut self, prio: u128) {
70        let pos = match self.in_flight.binary_search(&prio) {
71            Ok(pos) => pos,
72            Err(pos) => pos,
73        };
74        self.in_flight.insert(pos, prio);
75    }
76
77    fn remove(&mut self, prio: u128) {
78        if let Ok(pos) = self.in_flight.binary_search(&prio) {
79            self.in_flight.remove(pos);
80        }
81    }
82}
83
84struct IoQueueState {
85    // Number of IOPS we can issue concurrently before pausing I/O
86    iops_avail: u32,
87    // Number of bytes we are allowed to buffer in memory before pausing I/O
88    //
89    // This can dip below 0 due to I/O prioritization
90    bytes_avail: i64,
91    // Pending I/O requests
92    pending_requests: BinaryHeap<IoTask>,
93    // Priorities of in-flight requests
94    priorities_in_flight: PrioritiesInFlight,
95    // Set when the scheduler is finished to notify the I/O loop to shut down
96    // once all outstanding requests have been completed.
97    done_scheduling: bool,
98    // Time when the scheduler started
99    start: Instant,
100    // Last time we warned about backpressure
101    last_warn: AtomicU64,
102    // When true, skip all byte-based backpressure checks (set when io_buffer_size == 0)
103    no_backpressure: bool,
104}
105
106impl IoQueueState {
107    fn new(io_capacity: u32, io_buffer_size: u64) -> Self {
108        Self {
109            iops_avail: io_capacity,
110            bytes_avail: io_buffer_size as i64,
111            pending_requests: BinaryHeap::new(),
112            priorities_in_flight: PrioritiesInFlight::new(io_capacity),
113            done_scheduling: false,
114            start: Instant::now(),
115            last_warn: AtomicU64::from(0),
116            no_backpressure: io_buffer_size == 0,
117        }
118    }
119
120    fn warn_if_needed(&self) {
121        let seconds_elapsed = self.start.elapsed().as_secs();
122        let last_warn = self.last_warn.load(Ordering::Acquire);
123        let since_last_warn = seconds_elapsed - last_warn;
124        if (last_warn == 0
125            && seconds_elapsed > BACKPRESSURE_MIN
126            && seconds_elapsed < BACKPRESSURE_DEBOUNCE)
127            || since_last_warn > BACKPRESSURE_DEBOUNCE
128        {
129            tracing::event!(tracing::Level::DEBUG, "Backpressure throttle exceeded");
130            log::debug!(
131                "Backpressure throttle is full, I/O will pause until buffer is drained.  Max I/O bandwidth will not be achieved because CPU is falling behind"
132            );
133            self.last_warn
134                .store(seconds_elapsed.max(1), Ordering::Release);
135        }
136    }
137
138    fn can_deliver(&self, task: &IoTask) -> bool {
139        if self.iops_avail == 0 {
140            false
141        } else if self.no_backpressure
142            || task.bypass_backpressure
143            || task.priority <= self.priorities_in_flight.min_in_flight()
144        {
145            true
146        } else if task.num_bytes() as i64 > self.bytes_avail {
147            self.warn_if_needed();
148            false
149        } else {
150            true
151        }
152    }
153
154    fn next_task(&mut self) -> Option<IoTask> {
155        let task = self.pending_requests.peek()?;
156        if self.can_deliver(task) {
157            let skip_bytes_accounting = self.no_backpressure || task.bypass_backpressure;
158            self.priorities_in_flight.push(task.priority);
159            self.iops_avail -= 1;
160            if !skip_bytes_accounting {
161                self.bytes_avail -= task.num_bytes() as i64;
162                if self.bytes_avail < 0 {
163                    // This can happen when we admit special priority requests
164                    log::debug!(
165                        "Backpressure throttle temporarily exceeded by {} bytes due to priority I/O",
166                        -self.bytes_avail
167                    );
168                }
169            }
170            Some(self.pending_requests.pop().unwrap())
171        } else {
172            None
173        }
174    }
175}
176
177// This is modeled after the MPSC queue described here: https://docs.rs/tokio/latest/tokio/sync/struct.Notify.html
178//
179// However, it only needs to be SPSC since there is only one "scheduler thread"
180// and one I/O loop.
181struct IoQueue {
182    // Queue state
183    state: Mutex<IoQueueState>,
184    // Used to signal new I/O requests have arrived that might potentially be runnable
185    notify: Notify,
186}
187
188impl IoQueue {
189    fn new(io_capacity: u32, io_buffer_size: u64) -> Self {
190        Self {
191            state: Mutex::new(IoQueueState::new(io_capacity, io_buffer_size)),
192            notify: Notify::new(),
193        }
194    }
195
196    fn push(&self, task: IoTask) {
197        log::trace!(
198            "Inserting I/O request for {} bytes with priority ({},{}) into I/O queue",
199            task.num_bytes(),
200            task.priority >> 64,
201            task.priority & 0xFFFFFFFFFFFFFFFF
202        );
203        let mut state = self.state.lock().unwrap();
204        state.pending_requests.push(task);
205        drop(state);
206
207        self.notify.notify_one();
208    }
209
210    async fn pop(&self) -> Option<IoTask> {
211        loop {
212            {
213                let mut state = self.state.lock().unwrap();
214                if let Some(task) = state.next_task() {
215                    return Some(task);
216                }
217
218                if state.done_scheduling {
219                    return None;
220                }
221            }
222
223            self.notify.notified().await;
224        }
225    }
226
227    fn on_iop_complete(&self) {
228        let mut state = self.state.lock().unwrap();
229        state.iops_avail += 1;
230        drop(state);
231
232        self.notify.notify_one();
233    }
234
235    fn on_bytes_consumed(&self, bytes: u64, priority: u128, num_reqs: usize) {
236        let mut state = self.state.lock().unwrap();
237        state.bytes_avail += bytes as i64;
238        for _ in 0..num_reqs {
239            state.priorities_in_flight.remove(priority);
240        }
241        drop(state);
242
243        self.notify.notify_one();
244    }
245
246    fn close(&self) {
247        let mut state = self.state.lock().unwrap();
248        state.done_scheduling = true;
249        let pending_requests = std::mem::take(&mut state.pending_requests);
250        drop(state);
251        for request in pending_requests {
252            request.cancel();
253        }
254
255        self.notify.notify_one();
256    }
257}
258
259// There is one instance of MutableBatch shared by all the I/O operations
260// that make up a single request.  When all the I/O operations complete
261// then the MutableBatch goes out of scope and the batch request is considered
262// complete
263struct MutableBatch<F: FnOnce(Response) + Send> {
264    when_done: Option<F>,
265    data_buffers: Vec<Bytes>,
266    num_bytes: u64,
267    priority: u128,
268    num_reqs: usize,
269    err: Option<Box<dyn std::error::Error + Send + Sync + 'static>>,
270    // When true, report 0 bytes consumed so the backpressure budget is unaffected
271    bypass_backpressure: bool,
272}
273
274impl<F: FnOnce(Response) + Send> MutableBatch<F> {
275    fn new(
276        when_done: F,
277        num_data_buffers: u32,
278        priority: u128,
279        num_reqs: usize,
280        bypass_backpressure: bool,
281    ) -> Self {
282        Self {
283            when_done: Some(when_done),
284            data_buffers: vec![Bytes::default(); num_data_buffers as usize],
285            num_bytes: 0,
286            priority,
287            num_reqs,
288            err: None,
289            bypass_backpressure,
290        }
291    }
292}
293
294// Rather than keep track of when all the I/O requests are finished so that we
295// can deliver the batch of data we let Rust do that for us.  When all I/O's are
296// done then the MutableBatch will go out of scope and we know we have all the
297// data.
298impl<F: FnOnce(Response) + Send> Drop for MutableBatch<F> {
299    fn drop(&mut self) {
300        // If we have an error, return that.  Otherwise return the data
301        let result = if self.err.is_some() {
302            Err(Error::wrapped(self.err.take().unwrap()))
303        } else {
304            let mut data = Vec::new();
305            std::mem::swap(&mut data, &mut self.data_buffers);
306            Ok(data)
307        };
308        // We don't really care if no one is around to receive it, just let
309        // the result go out of scope and get cleaned up
310        let response = Response {
311            data: result,
312            // Report 0 bytes for bypass tasks so the backpressure budget is unaffected
313            num_bytes: if self.bypass_backpressure {
314                0
315            } else {
316                self.num_bytes
317            },
318            priority: self.priority,
319            num_reqs: self.num_reqs,
320        };
321        (self.when_done.take().unwrap())(response);
322    }
323}
324
325struct DataChunk {
326    task_idx: usize,
327    num_bytes: u64,
328    data: Result<Bytes>,
329}
330
331trait DataSink: Send {
332    fn deliver_data(&mut self, data: DataChunk);
333}
334
335impl<F: FnOnce(Response) + Send> DataSink for MutableBatch<F> {
336    // Called by worker tasks to add data to the MutableBatch
337    fn deliver_data(&mut self, data: DataChunk) {
338        self.num_bytes += data.num_bytes;
339        match data.data {
340            Ok(data_bytes) => {
341                self.data_buffers[data.task_idx] = data_bytes;
342            }
343            Err(err) => {
344                // This keeps the original error, if present
345                self.err.get_or_insert(Box::new(err));
346            }
347        }
348    }
349}
350
351struct IoTask {
352    reader: Arc<dyn Reader>,
353    to_read: Range<u64>,
354    when_done: Box<dyn FnOnce(Result<Bytes>) + Send>,
355    priority: u128,
356    bypass_backpressure: bool,
357}
358
359impl Eq for IoTask {}
360
361impl PartialEq for IoTask {
362    fn eq(&self, other: &Self) -> bool {
363        self.bypass_backpressure == other.bypass_backpressure && self.priority == other.priority
364    }
365}
366
367impl PartialOrd for IoTask {
368    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
369        Some(self.cmp(other))
370    }
371}
372
373impl Ord for IoTask {
374    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
375        // Bypass tasks are always delivered before normal tasks.
376        // Within the same bypass class, this is a min-heap on priority.
377        self.bypass_backpressure
378            .cmp(&other.bypass_backpressure)
379            .then(other.priority.cmp(&self.priority))
380    }
381}
382
383impl IoTask {
384    fn num_bytes(&self) -> u64 {
385        self.to_read.end - self.to_read.start
386    }
387    fn cancel(self) {
388        (self.when_done)(Err(Error::internal(
389            "Scheduler closed before I/O was completed".to_string(),
390        )));
391    }
392
393    async fn run(self) {
394        let file_path = self.reader.path().as_ref();
395        let num_bytes = self.num_bytes();
396        let bytes = if self.to_read.start == self.to_read.end {
397            Ok(Bytes::new())
398        } else {
399            let bytes_fut = self
400                .reader
401                .get_range(self.to_read.start as usize..self.to_read.end as usize);
402            IOPS_COUNTER.fetch_add(1, Ordering::Release);
403            let num_bytes = self.num_bytes();
404            bytes_fut
405                .inspect(move |_| {
406                    BYTES_READ_COUNTER.fetch_add(num_bytes, Ordering::Release);
407                })
408                .await
409                .map_err(Error::from)
410        };
411        // Emit per-file I/O trace event only when tracing is enabled
412        tracing::trace!(
413            file = file_path,
414            bytes_read = num_bytes,
415            requests = 1,
416            range_start = self.to_read.start,
417            range_end = self.to_read.end,
418            "File I/O completed"
419        );
420        (self.when_done)(bytes);
421    }
422}
423
424// Every time a scheduler starts up it launches a task to run the I/O loop.  This loop
425// repeats endlessly until the scheduler is destroyed.
426async fn run_io_loop(tasks: Arc<IoQueue>) {
427    // Pop the first finished task off the queue and submit another until
428    // we are done
429    loop {
430        let next_task = tasks.pop().await;
431        match next_task {
432            Some(task) => {
433                tokio::spawn(task.run());
434            }
435            None => {
436                // The sender has been dropped, we are done
437                return;
438            }
439        }
440    }
441}
442
443#[derive(Debug)]
444struct StatsCollector {
445    iops: AtomicU64,
446    requests: AtomicU64,
447    bytes_read: AtomicU64,
448}
449
450impl StatsCollector {
451    fn new() -> Self {
452        Self {
453            iops: AtomicU64::new(0),
454            requests: AtomicU64::new(0),
455            bytes_read: AtomicU64::new(0),
456        }
457    }
458
459    fn iops(&self) -> u64 {
460        self.iops.load(Ordering::Relaxed)
461    }
462
463    fn bytes_read(&self) -> u64 {
464        self.bytes_read.load(Ordering::Relaxed)
465    }
466
467    fn requests(&self) -> u64 {
468        self.requests.load(Ordering::Relaxed)
469    }
470
471    fn record_request(&self, request: &[Range<u64>]) {
472        self.requests.fetch_add(1, Ordering::Relaxed);
473        self.iops.fetch_add(request.len() as u64, Ordering::Relaxed);
474        self.bytes_read.fetch_add(
475            request.iter().map(|r| r.end - r.start).sum::<u64>(),
476            Ordering::Relaxed,
477        );
478    }
479
480    /// Add already-aggregated counts (e.g. a snapshot captured from another
481    /// scheduler) into these counters.
482    fn add(&self, iops: u64, requests: u64, bytes_read: u64) {
483        self.iops.fetch_add(iops, Ordering::Relaxed);
484        self.requests.fetch_add(requests, Ordering::Relaxed);
485        self.bytes_read.fetch_add(bytes_read, Ordering::Relaxed);
486    }
487}
488
489impl IoStatsRecorder for StatsCollector {
490    fn record_request(&self, request: &[Range<u64>]) {
491        // Inherent methods take precedence in resolution, so this delegates to
492        // the inherent `record_request` above rather than recursing.
493        Self::record_request(self, request)
494    }
495}
496
497#[derive(Debug, Clone, Copy, Default)]
498pub struct ScanStats {
499    pub iops: u64,
500    pub requests: u64,
501    pub bytes_read: u64,
502}
503
504impl ScanStats {
505    fn new(stats: &StatsCollector) -> Self {
506        Self {
507            iops: stats.iops(),
508            requests: stats.requests(),
509            bytes_read: stats.bytes_read(),
510        }
511    }
512}
513
514/// A shareable, cloneable handle to a set of cumulative I/O counters.
515///
516/// All clones share the same underlying counters.  This serves two purposes:
517///
518/// 1. It backs each [`ScanScheduler`]'s own running totals.
519/// 2. It can be attached to an individual [`FileScheduler`] (via
520///    [`FileScheduler::with_io_stats`]) as a *secondary* sink, so a caller can
521///    measure the exact bytes/IOPS performed through that file handle for a
522///    bounded scope (e.g. a single query) without disturbing the scheduler's
523///    global totals.  Read the result back with [`IoStats::snapshot`].
524#[derive(Debug, Clone)]
525pub struct IoStats(Arc<StatsCollector>);
526
527impl IoStats {
528    pub fn new() -> Self {
529        Self(Arc::new(StatsCollector::new()))
530    }
531
532    /// Record a single completed request.  `request` holds the byte ranges as
533    /// actually submitted to storage (post coalescing/splitting), so the counts
534    /// reflect physical I/O.
535    pub fn record_request(&self, request: &[Range<u64>]) {
536        self.0.record_request(request);
537    }
538
539    /// Take an immutable snapshot of the current cumulative counters.
540    pub fn snapshot(&self) -> ScanStats {
541        ScanStats::new(self.0.as_ref())
542    }
543
544    /// Return this handle as a type-erased [`IoStatsRecorder`], suitable for
545    /// attaching to a file reader (e.g. `FileReader::with_io_stats`).  The
546    /// returned recorder shares the same underlying counters as `self`.
547    pub fn recorder(&self) -> Arc<dyn IoStatsRecorder> {
548        self.0.clone()
549    }
550
551    /// Add a snapshot of already-aggregated statistics into this sink.  Used to
552    /// fold in I/O measured on a separate scheduler (e.g. the one-time reads
553    /// performed while opening an index).
554    pub fn add_scan_stats(&self, stats: &ScanStats) {
555        self.0.add(stats.iops, stats.requests, stats.bytes_read);
556    }
557}
558
559impl Default for IoStats {
560    fn default() -> Self {
561        Self::new()
562    }
563}
564
565enum IoQueueType {
566    Standard(Arc<IoQueue>),
567    Lite(Arc<lite::IoQueue>),
568}
569
570/// An I/O scheduler which wraps an ObjectStore and throttles the amount of
571/// parallel I/O that can be run.
572///
573/// The ScanScheduler will cancel any outstanding I/O requests when it is dropped.
574/// For this reason it should be kept alive until all I/O has finished.
575///
576/// Note: The 2.X file readers already do this so this is only a concern if you are
577/// using the ScanScheduler directly.
578pub struct ScanScheduler {
579    object_store: Arc<ObjectStore>,
580    io_queue: IoQueueType,
581    stats: IoStats,
582}
583
584impl Debug for ScanScheduler {
585    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
586        f.debug_struct("ScanScheduler")
587            .field("object_store", &self.object_store)
588            .finish()
589    }
590}
591
592struct Response {
593    data: Result<Vec<Bytes>>,
594    priority: u128,
595    num_reqs: usize,
596    num_bytes: u64,
597}
598
599#[derive(Debug, Clone, Copy)]
600pub struct SchedulerConfig {
601    /// the # of bytes that can be buffered but not yet requested.
602    /// This controls back pressure.  If data is not processed quickly enough then this
603    /// buffer will fill up and the I/O loop will pause until the buffer is drained.
604    pub io_buffer_size_bytes: u64,
605    /// Whether to use the lite scheduler.
606    ///
607    /// - `Some(true)` forces the lite scheduler (e.g. from env var or programmatic).
608    /// - `Some(false)` forces the standard scheduler.
609    /// - `None` defers to the object store's preference (see [`ObjectStore::prefers_lite_scheduler`]).
610    pub use_lite_scheduler: Option<bool>,
611}
612
613impl SchedulerConfig {
614    pub fn new(io_buffer_size_bytes: u64) -> Self {
615        Self {
616            io_buffer_size_bytes,
617            use_lite_scheduler: std::env::var("LANCE_USE_LITE_SCHEDULER")
618                .ok()
619                .map(|v| str_is_truthy(v.trim())),
620        }
621    }
622
623    /// Big enough for unit testing
624    pub fn default_for_testing() -> Self {
625        Self {
626            io_buffer_size_bytes: 256 * 1024 * 1024,
627            use_lite_scheduler: None,
628        }
629    }
630
631    /// Configuration that should generally maximize bandwidth (not trying to save RAM
632    /// at all).  We assume a max page size of 32MiB and then allow 32MiB per I/O thread
633    pub fn max_bandwidth(store: &ObjectStore) -> Self {
634        Self::new(32 * 1024 * 1024 * store.io_parallelism() as u64)
635    }
636
637    pub fn with_lite_scheduler(self) -> Self {
638        Self {
639            use_lite_scheduler: Some(true),
640            ..self
641        }
642    }
643}
644
645impl ScanScheduler {
646    /// Create a new scheduler with the given I/O capacity
647    ///
648    /// # Arguments
649    ///
650    /// * object_store - the store to wrap
651    /// * config - configuration settings for the scheduler
652    pub fn new(object_store: Arc<ObjectStore>, config: SchedulerConfig) -> Arc<Self> {
653        let io_capacity = object_store.io_parallelism();
654        let use_lite = config
655            .use_lite_scheduler
656            .unwrap_or_else(|| object_store.prefers_lite_scheduler());
657        let io_queue = if use_lite {
658            let io_queue = Arc::new(lite::IoQueue::new(
659                io_capacity as u64,
660                config.io_buffer_size_bytes,
661            ));
662            IoQueueType::Lite(io_queue)
663        } else {
664            let io_queue = Arc::new(IoQueue::new(
665                io_capacity as u32,
666                config.io_buffer_size_bytes,
667            ));
668            let io_queue_clone = io_queue.clone();
669            // Best we can do here is fire and forget.  If the I/O loop is still running when the scheduler is
670            // dropped we can't wait for it to finish or we'd block a tokio thread.  We could spawn a blocking task
671            // to wait for it to finish but that doesn't seem helpful.
672            tokio::task::spawn(async move { run_io_loop(io_queue_clone).await });
673            IoQueueType::Standard(io_queue)
674        };
675        Arc::new(Self {
676            object_store,
677            io_queue,
678            stats: IoStats::new(),
679        })
680    }
681
682    /// Open a file for reading
683    ///
684    /// # Arguments
685    ///
686    /// * path - the path to the file to open
687    /// * base_priority - the base priority for I/O requests submitted to this file scheduler
688    ///   this will determine the upper 64 bits of priority (the lower 64 bits
689    ///   come from `submit_request` and `submit_single`)
690    pub async fn open_file_with_priority(
691        self: &Arc<Self>,
692        path: &Path,
693        base_priority: u64,
694        file_size_bytes: &CachedFileSize,
695    ) -> Result<FileScheduler> {
696        let file_size_bytes = if let Some(size) = file_size_bytes.get() {
697            u64::from(size)
698        } else {
699            let size = self.object_store.size(path).await?;
700            if let Some(size) = NonZero::new(size) {
701                file_size_bytes.set(size);
702            }
703            size
704        };
705        let reader = self
706            .object_store
707            .open_with_size(path, file_size_bytes as usize)
708            .await?;
709        let block_size = self.object_store.block_size() as u64;
710        let max_iop_size = self.object_store.max_iop_size();
711        Ok(FileScheduler {
712            reader: reader.into(),
713            block_size,
714            root: self.clone(),
715            base_priority,
716            max_iop_size,
717            bypass_backpressure: false,
718            extra_stats: None,
719        })
720    }
721
722    /// Open a file with a default priority of 0
723    ///
724    /// See [`Self::open_file_with_priority`] for more information on the priority
725    pub async fn open_file(
726        self: &Arc<Self>,
727        path: &Path,
728        file_size_bytes: &CachedFileSize,
729    ) -> Result<FileScheduler> {
730        self.open_file_with_priority(path, 0, file_size_bytes).await
731    }
732
733    fn do_submit_request(
734        &self,
735        reader: Arc<dyn Reader>,
736        request: Vec<Range<u64>>,
737        tx: oneshot::Sender<Response>,
738        priority: u128,
739        io_queue: &Arc<IoQueue>,
740        bypass_backpressure: bool,
741    ) {
742        let num_iops = request.len() as u32;
743
744        let when_all_io_done = move |bytes_and_permits| {
745            // We don't care if the receiver has given up so discard the result
746            let _ = tx.send(bytes_and_permits);
747        };
748
749        let dest = Arc::new(Mutex::new(Box::new(MutableBatch::new(
750            when_all_io_done,
751            num_iops,
752            priority,
753            request.len(),
754            bypass_backpressure,
755        ))));
756
757        for (task_idx, iop) in request.into_iter().enumerate() {
758            let dest = dest.clone();
759            let io_queue_clone = io_queue.clone();
760            let num_bytes = iop.end - iop.start;
761            let task = IoTask {
762                reader: reader.clone(),
763                to_read: iop,
764                priority,
765                bypass_backpressure,
766                when_done: Box::new(move |data| {
767                    io_queue_clone.on_iop_complete();
768                    let mut dest = dest.lock().unwrap();
769                    let chunk = DataChunk {
770                        data,
771                        task_idx,
772                        num_bytes,
773                    };
774                    dest.deliver_data(chunk);
775                }),
776            };
777            io_queue.push(task);
778        }
779    }
780
781    fn submit_request_standard(
782        &self,
783        reader: Arc<dyn Reader>,
784        request: Vec<Range<u64>>,
785        priority: u128,
786        io_queue: &Arc<IoQueue>,
787        bypass_backpressure: bool,
788    ) -> impl Future<Output = Result<Vec<Bytes>>> + Send + use<> {
789        let (tx, rx) = oneshot::channel::<Response>();
790
791        self.do_submit_request(reader, request, tx, priority, io_queue, bypass_backpressure);
792
793        let io_queue_clone = io_queue.clone();
794
795        rx.map(move |wrapped_rsp| {
796            // Right now, it isn't possible for I/O to be cancelled so a cancel error should
797            // not occur
798            let rsp = wrapped_rsp.unwrap();
799            io_queue_clone.on_bytes_consumed(rsp.num_bytes, rsp.priority, rsp.num_reqs);
800            rsp.data
801        })
802    }
803
804    fn submit_request_lite(
805        &self,
806        reader: Arc<dyn Reader>,
807        request: Vec<Range<u64>>,
808        priority: u128,
809        io_queue: &Arc<lite::IoQueue>,
810        bypass_backpressure: bool,
811    ) -> impl Future<Output = Result<Vec<Bytes>>> + Send + use<> {
812        // It's important that we submit all requests _before_ we await anything
813        let maybe_tasks = request
814            .into_iter()
815            .map(|task| {
816                let reader = reader.clone();
817                let queue = io_queue.clone();
818                let run_fn = Box::new(move || {
819                    reader
820                        .get_range(task.start as usize..task.end as usize)
821                        .map_err(Error::from)
822                        .boxed()
823                });
824                queue.submit(task, priority, run_fn, bypass_backpressure)
825            })
826            .collect::<Result<Vec<_>>>();
827        match maybe_tasks {
828            Ok(tasks) => async move {
829                let mut results = Vec::with_capacity(tasks.len());
830                for task in tasks {
831                    results.push(task.await?);
832                }
833                Ok(results)
834            }
835            .boxed(),
836            Err(e) => async move { Err(e) }.boxed(),
837        }
838    }
839
840    pub fn submit_request(
841        &self,
842        reader: Arc<dyn Reader>,
843        request: Vec<Range<u64>>,
844        priority: u128,
845        bypass_backpressure: bool,
846    ) -> impl Future<Output = Result<Vec<Bytes>>> + Send + use<> {
847        match &self.io_queue {
848            IoQueueType::Standard(io_queue) => {
849                futures::future::Either::Left(self.submit_request_standard(
850                    reader,
851                    request,
852                    priority,
853                    io_queue,
854                    bypass_backpressure,
855                ))
856            }
857            IoQueueType::Lite(io_queue) => futures::future::Either::Right(
858                self.submit_request_lite(reader, request, priority, io_queue, bypass_backpressure),
859            ),
860        }
861    }
862
863    pub fn stats(&self) -> ScanStats {
864        self.stats.snapshot()
865    }
866
867    #[cfg(test)]
868    fn uses_lite_scheduler(&self) -> bool {
869        matches!(self.io_queue, IoQueueType::Lite(_))
870    }
871}
872
873impl Drop for ScanScheduler {
874    fn drop(&mut self) {
875        // If the user is dropping the ScanScheduler then they _should_ be done with I/O.  This can happen
876        // even when I/O is in progress if, for example, the user is dropping a scan mid-read because they found
877        // the data they wanted (limit after filter or some other example).
878        //
879        // Closing the I/O queue will cancel any requests that have not yet been sent to the I/O loop.  However,
880        // it will not terminate the I/O loop itself.  This is to help prevent deadlock and ensure that all I/O
881        // requests that are submitted will terminate.
882        //
883        // In theory, this isn't strictly necessary, as callers should drop any task expecting I/O before they
884        // drop the scheduler.  In practice, this can be difficult to do, and it is better to spend a little bit
885        // of time letting the I/O loop drain so that we can avoid any potential deadlocks.
886        match &self.io_queue {
887            IoQueueType::Standard(io_queue) => io_queue.close(),
888            IoQueueType::Lite(io_queue) => io_queue.close(),
889        }
890    }
891}
892
893/// A throttled file reader
894#[derive(Clone, Debug)]
895pub struct FileScheduler {
896    reader: Arc<dyn Reader>,
897    root: Arc<ScanScheduler>,
898    block_size: u64,
899    base_priority: u64,
900    max_iop_size: u64,
901    bypass_backpressure: bool,
902    /// Optional secondary statistics sink.  When set, every request submitted
903    /// through this handle is also recorded here, in addition to the
904    /// scheduler's global totals.  Used to measure per-scope I/O.
905    extra_stats: Option<Arc<dyn IoStatsRecorder>>,
906}
907
908fn is_close_together(range1: &Range<u64>, range2: &Range<u64>, block_size: u64) -> bool {
909    // Note that range1.end <= range2.start is possible (e.g. when decoding string arrays)
910    range2.start <= (range1.end + block_size)
911}
912
913fn is_overlapping(range1: &Range<u64>, range2: &Range<u64>) -> bool {
914    range1.start < range2.end && range2.start < range1.end
915}
916
917impl FileScheduler {
918    /// Submit a batch of I/O requests to the reader
919    ///
920    /// The requests will be queued in a FIFO manner and, when all requests
921    /// have been fulfilled, the returned future will be completed.
922    ///
923    /// Each request has a given priority.  If the I/O loop is full then requests
924    /// will be buffered and requests with the *lowest* priority will be released
925    /// from the buffer first.
926    ///
927    /// Each request has a backpressure ID which controls which backpressure throttle
928    /// is applied to the request.  Requests made to the same backpressure throttle
929    /// will be throttled together.
930    pub fn submit_request(
931        &self,
932        request: Vec<Range<u64>>,
933        priority: u64,
934    ) -> impl Future<Output = Result<Vec<Bytes>>> + Send + use<> {
935        // The final priority is a combination of the row offset and the file number
936        let priority = ((self.base_priority as u128) << 64) + priority as u128;
937
938        let mut merged_requests = Vec::with_capacity(request.len());
939
940        if !request.is_empty() {
941            let mut curr_interval = request[0].clone();
942
943            for req in request.iter().skip(1) {
944                if is_close_together(&curr_interval, req, self.block_size) {
945                    curr_interval.end = curr_interval.end.max(req.end);
946                } else {
947                    merged_requests.push(curr_interval);
948                    curr_interval = req.clone();
949                }
950            }
951
952            merged_requests.push(curr_interval);
953        }
954
955        let mut updated_requests = Vec::with_capacity(merged_requests.len());
956        for req in merged_requests {
957            if req.is_empty() {
958                updated_requests.push(req);
959            } else {
960                let num_requests = (req.end - req.start).div_ceil(self.max_iop_size);
961                let bytes_per_request = (req.end - req.start) / num_requests;
962                for i in 0..num_requests {
963                    let start = req.start + i * bytes_per_request;
964                    let end = if i == num_requests - 1 {
965                        // Last request is a bit bigger due to rounding
966                        req.end
967                    } else {
968                        start + bytes_per_request
969                    };
970                    updated_requests.push(start..end);
971                }
972            }
973        }
974
975        self.root.stats.record_request(&updated_requests);
976        if let Some(extra_stats) = &self.extra_stats {
977            extra_stats.record_request(&updated_requests);
978        }
979
980        let bytes_vec_fut = self.root.submit_request(
981            self.reader.clone(),
982            updated_requests.clone(),
983            priority,
984            self.bypass_backpressure,
985        );
986
987        let mut updated_index = 0;
988        let mut final_bytes = Vec::with_capacity(request.len());
989
990        async move {
991            let bytes_vec = bytes_vec_fut.await?;
992
993            let mut orig_index = 0;
994            while (updated_index < updated_requests.len()) && (orig_index < request.len()) {
995                let updated_range = &updated_requests[updated_index];
996                let orig_range = &request[orig_index];
997                let byte_offset = updated_range.start as usize;
998
999                if is_overlapping(updated_range, orig_range) {
1000                    // We need to undo the coalescing and splitting done earlier
1001                    let start = orig_range.start as usize - byte_offset;
1002                    if orig_range.end <= updated_range.end {
1003                        // The original range is fully contained in the updated range, can do
1004                        // zero-copy slice
1005                        let end = orig_range.end as usize - byte_offset;
1006                        final_bytes.push(bytes_vec[updated_index].slice(start..end));
1007                    } else {
1008                        // The original read was split into multiple requests, need to copy
1009                        // back into a single buffer
1010                        let orig_size = orig_range.end - orig_range.start;
1011                        let mut merged_bytes = Vec::with_capacity(orig_size as usize);
1012                        merged_bytes.extend_from_slice(&bytes_vec[updated_index].slice(start..));
1013                        let mut copy_offset = merged_bytes.len() as u64;
1014                        while copy_offset < orig_size {
1015                            updated_index += 1;
1016                            let next_range = &updated_requests[updated_index];
1017                            let bytes_to_take =
1018                                (orig_size - copy_offset).min(next_range.end - next_range.start);
1019                            merged_bytes.extend_from_slice(
1020                                &bytes_vec[updated_index].slice(0..bytes_to_take as usize),
1021                            );
1022                            copy_offset += bytes_to_take;
1023                        }
1024                        final_bytes.push(Bytes::from(merged_bytes));
1025                    }
1026                    orig_index += 1;
1027                } else {
1028                    updated_index += 1;
1029                }
1030            }
1031
1032            Ok(final_bytes)
1033        }
1034    }
1035
1036    pub fn with_priority(&self, priority: u64) -> Self {
1037        Self {
1038            reader: self.reader.clone(),
1039            root: self.root.clone(),
1040            block_size: self.block_size,
1041            max_iop_size: self.max_iop_size,
1042            base_priority: priority,
1043            bypass_backpressure: self.bypass_backpressure,
1044            extra_stats: self.extra_stats.clone(),
1045        }
1046    }
1047
1048    /// Returns a copy of this scheduler that additionally records the I/O it
1049    /// performs into `stats`, on top of the scheduler's global statistics.
1050    ///
1051    /// This is the mechanism for measuring exact per-scope (e.g. per-query) I/O:
1052    /// attach a recorder here (e.g. via [`IoStats::recorder`]), perform the reads
1053    /// through the returned handle, then read the totals back with
1054    /// [`IoStats::snapshot`].  The returned handle is cheap to create (a few
1055    /// `Arc` clones) and reuses the same underlying reader, so it does not
1056    /// re-open the file.
1057    pub fn with_io_stats(&self, stats: Arc<dyn IoStatsRecorder>) -> Self {
1058        Self {
1059            extra_stats: Some(stats),
1060            ..self.clone()
1061        }
1062    }
1063
1064    /// Returns a copy of this scheduler that bypasses backpressure for all requests.
1065    ///
1066    /// This should be used for indirect I/O (e.g. fetching items after decoding offsets) where
1067    /// blocking on backpressure could cause a deadlock or excessive latency.
1068    pub fn with_bypass_backpressure(&self) -> Self {
1069        Self {
1070            bypass_backpressure: true,
1071            ..self.clone()
1072        }
1073    }
1074
1075    /// Submit a single IOP to the reader
1076    ///
1077    /// If you have multiple IOPS to perform then [`Self::submit_request`] is going
1078    /// to be more efficient.
1079    ///
1080    /// See [`Self::submit_request`] for more information on the priority and backpressure.
1081    pub fn submit_single(
1082        &self,
1083        range: Range<u64>,
1084        priority: u64,
1085    ) -> impl Future<Output = Result<Bytes>> + Send {
1086        self.submit_request(vec![range], priority)
1087            .map_ok(|vec_bytes| vec_bytes.into_iter().next().unwrap())
1088    }
1089
1090    /// Provides access to the underlying reader
1091    ///
1092    /// Do not use this for reading data as it will bypass any I/O scheduling!
1093    /// This is mainly exposed to allow metadata operations (e.g size, block_size,)
1094    /// which either aren't IOPS or we don't throttle
1095    pub fn reader(&self) -> &Arc<dyn Reader> {
1096        &self.reader
1097    }
1098}
1099
1100#[cfg(test)]
1101mod tests {
1102    use std::{collections::VecDeque, time::Duration};
1103
1104    use futures::poll;
1105    use lance_core::utils::tempfile::TempObjFile;
1106    use rand::RngCore;
1107
1108    use object_store::{GetRange, ObjectStore as OSObjectStore, ObjectStoreExt, memory::InMemory};
1109    use tokio::{runtime::Handle, time::timeout};
1110    use url::Url;
1111
1112    use crate::{
1113        object_store::{DEFAULT_DOWNLOAD_RETRY_COUNT, DEFAULT_MAX_IOP_SIZE},
1114        testing::MockObjectStore,
1115    };
1116
1117    use super::*;
1118
1119    fn make_task(priority: u128, bypass_backpressure: bool) -> IoTask {
1120        IoTask {
1121            reader: Arc::new(TrackingReader {
1122                get_range_count: Arc::new(AtomicU64::new(0)),
1123                path: Path::parse("test").unwrap(),
1124            }),
1125            to_read: 0..1,
1126            when_done: Box::new(|_| {}),
1127            priority,
1128            bypass_backpressure,
1129        }
1130    }
1131
1132    #[test]
1133    fn test_iotask_ordering() {
1134        // Bypass tasks must come out of the heap before non-bypass tasks.
1135        // Within each group, lower priority number (= higher priority) comes first.
1136        let mut heap = BinaryHeap::new();
1137        heap.push(make_task(10, false)); // non-bypass, low priority
1138        heap.push(make_task(1, false)); // non-bypass, high priority
1139        heap.push(make_task(20, true)); // bypass, low priority
1140        heap.push(make_task(5, true)); // bypass, high priority
1141
1142        let order: Vec<(u128, bool)> = std::iter::from_fn(|| heap.pop())
1143            .map(|t| (t.priority, t.bypass_backpressure))
1144            .collect();
1145
1146        assert_eq!(order, vec![(5, true), (20, true), (1, false), (10, false)]);
1147    }
1148
1149    #[tokio::test]
1150    async fn test_full_seq_read() {
1151        let tmp_file = TempObjFile::default();
1152
1153        let obj_store = Arc::new(ObjectStore::local());
1154
1155        // Write 1MiB of data
1156        const DATA_SIZE: u64 = 1024 * 1024;
1157        let mut some_data = vec![0; DATA_SIZE as usize];
1158        rand::rng().fill_bytes(&mut some_data);
1159        obj_store.put(&tmp_file, &some_data).await.unwrap();
1160
1161        let config = SchedulerConfig::default_for_testing();
1162
1163        let scheduler = ScanScheduler::new(obj_store, config);
1164
1165        let file_scheduler = scheduler
1166            .open_file(&tmp_file, &CachedFileSize::unknown())
1167            .await
1168            .unwrap();
1169
1170        // Read it back 4KiB at a time
1171        const READ_SIZE: u64 = 4 * 1024;
1172        let mut reqs = VecDeque::new();
1173        let mut offset = 0;
1174        while offset < DATA_SIZE {
1175            reqs.push_back(
1176                #[allow(clippy::single_range_in_vec_init)]
1177                file_scheduler
1178                    .submit_request(vec![offset..offset + READ_SIZE], 0)
1179                    .await
1180                    .unwrap(),
1181            );
1182            offset += READ_SIZE;
1183        }
1184
1185        offset = 0;
1186        // Note: we should get parallel I/O even though we are consuming serially
1187        while offset < DATA_SIZE {
1188            let data = reqs.pop_front().unwrap();
1189            let actual = &data[0];
1190            let expected = &some_data[offset as usize..(offset + READ_SIZE) as usize];
1191            assert_eq!(expected, actual);
1192            offset += READ_SIZE;
1193        }
1194    }
1195
1196    #[tokio::test]
1197    async fn test_split_coalesce() {
1198        let tmp_file = TempObjFile::default();
1199
1200        let obj_store = Arc::new(ObjectStore::local());
1201
1202        // Write 75MiB of data
1203        const DATA_SIZE: u64 = 75 * 1024 * 1024;
1204        let mut some_data = vec![0; DATA_SIZE as usize];
1205        rand::rng().fill_bytes(&mut some_data);
1206        obj_store.put(&tmp_file, &some_data).await.unwrap();
1207
1208        let config = SchedulerConfig::default_for_testing();
1209
1210        let scheduler = ScanScheduler::new(obj_store, config);
1211
1212        let file_scheduler = scheduler
1213            .open_file(&tmp_file, &CachedFileSize::unknown())
1214            .await
1215            .unwrap();
1216
1217        // These 3 requests should be coalesced into a single I/O because they are within 4KiB
1218        // of each other
1219        let req =
1220            file_scheduler.submit_request(vec![50_000..51_000, 52_000..53_000, 54_000..55_000], 0);
1221
1222        let bytes = req.await.unwrap();
1223
1224        assert_eq!(bytes[0], &some_data[50_000..51_000]);
1225        assert_eq!(bytes[1], &some_data[52_000..53_000]);
1226        assert_eq!(bytes[2], &some_data[54_000..55_000]);
1227
1228        assert_eq!(1, scheduler.stats().iops);
1229
1230        // This should be split into 5 requests because it is so large
1231        let req = file_scheduler.submit_request(vec![0..DATA_SIZE], 0);
1232        let bytes = req.await.unwrap();
1233        assert!(bytes[0] == some_data, "data is not the same");
1234
1235        assert_eq!(6, scheduler.stats().iops);
1236
1237        // None of these requests are bigger than the max IOP size but they will be coalesced into
1238        // one IOP that is bigger and then split back into 2 requests that don't quite align with the original
1239        // ranges.
1240        let chunk_size = *DEFAULT_MAX_IOP_SIZE;
1241        let req = file_scheduler.submit_request(
1242            vec![
1243                10..chunk_size,
1244                chunk_size + 10..(chunk_size * 2) - 20,
1245                chunk_size * 2..(chunk_size * 2) + 10,
1246            ],
1247            0,
1248        );
1249
1250        let bytes = req.await.unwrap();
1251        let chunk_size = chunk_size as usize;
1252        assert!(
1253            bytes[0] == some_data[10..chunk_size],
1254            "data is not the same"
1255        );
1256        assert!(
1257            bytes[1] == some_data[chunk_size + 10..(chunk_size * 2) - 20],
1258            "data is not the same"
1259        );
1260        assert!(
1261            bytes[2] == some_data[chunk_size * 2..(chunk_size * 2) + 10],
1262            "data is not the same"
1263        );
1264        assert_eq!(8, scheduler.stats().iops);
1265
1266        let reads = (0..44)
1267            .map(|i| i * 1_000_000..(i + 1) * 1_000_000)
1268            .collect::<Vec<_>>();
1269        let req = file_scheduler.submit_request(reads, 0);
1270        let bytes = req.await.unwrap();
1271        for (i, bytes) in bytes.iter().enumerate() {
1272            assert!(
1273                bytes == &some_data[i * 1_000_000..(i + 1) * 1_000_000],
1274                "data is not the same"
1275            );
1276        }
1277        assert_eq!(11, scheduler.stats().iops);
1278    }
1279
1280    #[tokio::test]
1281    async fn test_io_stats_sink() {
1282        let tmp_file = TempObjFile::default();
1283        let obj_store = Arc::new(ObjectStore::local());
1284
1285        const DATA_SIZE: u64 = 1024 * 1024;
1286        let mut some_data = vec![0; DATA_SIZE as usize];
1287        rand::rng().fill_bytes(&mut some_data);
1288        obj_store.put(&tmp_file, &some_data).await.unwrap();
1289
1290        let scheduler = ScanScheduler::new(obj_store, SchedulerConfig::default_for_testing());
1291
1292        // Attach a per-scope sink to one file handle.
1293        let sink = IoStats::new();
1294        let file_scheduler = scheduler
1295            .open_file(&tmp_file, &CachedFileSize::unknown())
1296            .await
1297            .unwrap()
1298            .with_io_stats(sink.recorder());
1299
1300        // Three reads within 4KiB coalesce into a single physical IOP.  The sink
1301        // and the scheduler's global totals must agree exactly, because both are
1302        // recorded from the same post-coalescing request.
1303        file_scheduler
1304            .submit_request(vec![50_000..51_000, 52_000..53_000, 54_000..55_000], 0)
1305            .await
1306            .unwrap();
1307
1308        let global = scheduler.stats();
1309        let scoped = sink.snapshot();
1310        assert_eq!(1, scoped.iops);
1311        assert_eq!(1, scoped.requests);
1312        // Coalesced range 50_000..55_000 => 5000 physical bytes.
1313        assert_eq!(5000, scoped.bytes_read);
1314        assert_eq!(global.iops, scoped.iops);
1315        assert_eq!(global.requests, scoped.requests);
1316        assert_eq!(global.bytes_read, scoped.bytes_read);
1317
1318        // A sibling handle without the sink: the global totals advance but the
1319        // sink stays put, proving per-scope isolation.
1320        let other = scheduler
1321            .open_file(&tmp_file, &CachedFileSize::unknown())
1322            .await
1323            .unwrap();
1324        other.submit_request(vec![0..1000], 0).await.unwrap();
1325
1326        let global_after = scheduler.stats();
1327        let scoped_after = sink.snapshot();
1328        assert_eq!(global.bytes_read + 1000, global_after.bytes_read);
1329        assert_eq!(scoped.bytes_read, scoped_after.bytes_read);
1330        assert_eq!(scoped.iops, scoped_after.iops);
1331    }
1332
1333    #[tokio::test]
1334    async fn test_priority() {
1335        let some_path = Path::parse("foo").unwrap();
1336        let base_store = Arc::new(InMemory::new());
1337        base_store
1338            .put(&some_path, vec![0; 1000].into())
1339            .await
1340            .unwrap();
1341
1342        let semaphore = Arc::new(tokio::sync::Semaphore::new(0));
1343        let mut obj_store = MockObjectStore::default();
1344        let semaphore_copy = semaphore.clone();
1345        obj_store
1346            .expect_get_opts()
1347            .returning(move |location, options| {
1348                let semaphore = semaphore.clone();
1349                let base_store = base_store.clone();
1350                let location = location.clone();
1351                async move {
1352                    semaphore.acquire().await.unwrap().forget();
1353                    base_store.get_opts(&location, options).await
1354                }
1355                .boxed()
1356            });
1357        let obj_store = Arc::new(ObjectStore::new(
1358            Arc::new(obj_store),
1359            Url::parse("mem://").unwrap(),
1360            Some(500),
1361            None,
1362            false,
1363            false,
1364            1,
1365            DEFAULT_DOWNLOAD_RETRY_COUNT,
1366            None,
1367        ));
1368
1369        let config = SchedulerConfig {
1370            io_buffer_size_bytes: 1024 * 1024,
1371            use_lite_scheduler: None,
1372        };
1373
1374        let scan_scheduler = ScanScheduler::new(obj_store, config);
1375
1376        let file_scheduler = scan_scheduler
1377            .open_file(&Path::parse("foo").unwrap(), &CachedFileSize::new(1000))
1378            .await
1379            .unwrap();
1380
1381        // Issue a request, priority doesn't matter, it will be submitted
1382        // immediately (it will go pending)
1383        // Note: the timeout is to prevent a deadlock if the test fails.
1384        let first_fut = timeout(
1385            Duration::from_secs(10),
1386            file_scheduler.submit_single(0..10, 0),
1387        )
1388        .boxed();
1389
1390        // Issue another low priority request (it will go in queue)
1391        let mut second_fut = timeout(
1392            Duration::from_secs(10),
1393            file_scheduler.submit_single(0..20, 100),
1394        )
1395        .boxed();
1396
1397        // Issue a high priority request (it will go in queue and should bump
1398        // the other queued request down)
1399        let mut third_fut = timeout(
1400            Duration::from_secs(10),
1401            file_scheduler.submit_single(0..30, 0),
1402        )
1403        .boxed();
1404
1405        // Finish one file, should be the in-flight first request
1406        semaphore_copy.add_permits(1);
1407        assert!(first_fut.await.unwrap().unwrap().len() == 10);
1408        // Other requests should not be finished
1409        assert!(poll!(&mut second_fut).is_pending());
1410        assert!(poll!(&mut third_fut).is_pending());
1411
1412        // Next should be high priority request
1413        semaphore_copy.add_permits(1);
1414        assert!(third_fut.await.unwrap().unwrap().len() == 30);
1415        assert!(poll!(&mut second_fut).is_pending());
1416
1417        // Finally, the low priority request
1418        semaphore_copy.add_permits(1);
1419        assert!(second_fut.await.unwrap().unwrap().len() == 20);
1420    }
1421
1422    #[tokio::test(flavor = "multi_thread")]
1423    async fn test_backpressure() {
1424        let some_path = Path::parse("foo").unwrap();
1425        let base_store = Arc::new(InMemory::new());
1426        base_store
1427            .put(&some_path, vec![0; 100000].into())
1428            .await
1429            .unwrap();
1430
1431        let bytes_read = Arc::new(AtomicU64::from(0));
1432        let mut obj_store = MockObjectStore::default();
1433        let bytes_read_copy = bytes_read.clone();
1434        // Wraps the obj_store to keep track of how many bytes have been read
1435        obj_store
1436            .expect_get_opts()
1437            .returning(move |location, options| {
1438                let range = options.range.as_ref().unwrap();
1439                let num_bytes = match range {
1440                    GetRange::Bounded(bounded) => bounded.end - bounded.start,
1441                    _ => panic!(),
1442                };
1443                bytes_read_copy.fetch_add(num_bytes, Ordering::Release);
1444                let location = location.clone();
1445                let base_store = base_store.clone();
1446                async move { base_store.get_opts(&location, options).await }.boxed()
1447            });
1448        let obj_store = Arc::new(ObjectStore::new(
1449            Arc::new(obj_store),
1450            Url::parse("mem://").unwrap(),
1451            Some(500),
1452            None,
1453            false,
1454            false,
1455            1,
1456            DEFAULT_DOWNLOAD_RETRY_COUNT,
1457            None,
1458        ));
1459
1460        let config = SchedulerConfig {
1461            io_buffer_size_bytes: 10,
1462            use_lite_scheduler: None,
1463        };
1464
1465        let scan_scheduler = ScanScheduler::new(obj_store.clone(), config);
1466
1467        let file_scheduler = scan_scheduler
1468            .open_file(&Path::parse("foo").unwrap(), &CachedFileSize::new(100000))
1469            .await
1470            .unwrap();
1471
1472        let wait_for_idle = || async move {
1473            let handle = Handle::current();
1474            while handle.metrics().num_alive_tasks() != 1 {
1475                tokio::time::sleep(Duration::from_millis(10)).await;
1476            }
1477        };
1478        let wait_for_bytes_read_and_idle = |target_bytes: u64| {
1479            // We need to move `target` but don't want to move `bytes_read`
1480            let bytes_read = &bytes_read;
1481            async move {
1482                let bytes_read_copy = bytes_read.clone();
1483                while bytes_read_copy.load(Ordering::Acquire) < target_bytes {
1484                    tokio::time::sleep(Duration::from_millis(10)).await;
1485                }
1486                wait_for_idle().await;
1487            }
1488        };
1489
1490        // This read will begin immediately
1491        let first_fut = file_scheduler.submit_single(0..5, 0);
1492        // This read should also begin immediately
1493        let second_fut = file_scheduler.submit_single(0..5, 0);
1494        // This read will be throttled
1495        let third_fut = file_scheduler.submit_single(0..3, 0);
1496        // Two tasks (third_fut and unit test)
1497        wait_for_bytes_read_and_idle(10).await;
1498
1499        assert_eq!(first_fut.await.unwrap().len(), 5);
1500        // One task (unit test)
1501        wait_for_bytes_read_and_idle(13).await;
1502
1503        // 2 bytes are ready but 5 bytes requested, read will be blocked
1504        let fourth_fut = file_scheduler.submit_single(0..5, 0);
1505        wait_for_bytes_read_and_idle(13).await;
1506
1507        // Out of order completion is ok, will unblock backpressure
1508        assert_eq!(third_fut.await.unwrap().len(), 3);
1509        wait_for_bytes_read_and_idle(18).await;
1510
1511        assert_eq!(second_fut.await.unwrap().len(), 5);
1512        // At this point there are 5 bytes available in backpressure queue
1513        // Now we issue multi-read that can be partially fulfilled, it will read some bytes but
1514        // not all of them. (using large range gap to ensure request not coalesced)
1515        //
1516        // I'm actually not sure this behavior is great.  It's possible that we should just
1517        // block until we can fulfill the entire request.
1518        let fifth_fut = file_scheduler.submit_request(vec![0..3, 90000..90007], 0);
1519        wait_for_bytes_read_and_idle(21).await;
1520
1521        // Fifth future should eventually finish due to deadlock prevention
1522        let fifth_bytes = tokio::time::timeout(Duration::from_secs(10), fifth_fut)
1523            .await
1524            .unwrap();
1525        assert_eq!(
1526            fifth_bytes.unwrap().iter().map(|b| b.len()).sum::<usize>(),
1527            10
1528        );
1529
1530        // And now let's just make sure that we can read the rest of the data
1531        assert_eq!(fourth_fut.await.unwrap().len(), 5);
1532        wait_for_bytes_read_and_idle(28).await;
1533
1534        // Ensure deadlock prevention timeout can be disabled
1535        let config = SchedulerConfig {
1536            io_buffer_size_bytes: 10,
1537            use_lite_scheduler: None,
1538        };
1539
1540        let scan_scheduler = ScanScheduler::new(obj_store, config);
1541        let file_scheduler = scan_scheduler
1542            .open_file(&Path::parse("foo").unwrap(), &CachedFileSize::new(100000))
1543            .await
1544            .unwrap();
1545
1546        let first_fut = file_scheduler.submit_single(0..10, 0);
1547        let second_fut = file_scheduler.submit_single(0..10, 0);
1548
1549        std::thread::sleep(Duration::from_millis(100));
1550        assert_eq!(first_fut.await.unwrap().len(), 10);
1551        assert_eq!(second_fut.await.unwrap().len(), 10);
1552    }
1553
1554    /// A Reader that tracks how many times get_range has been called.
1555    #[derive(Debug)]
1556    struct TrackingReader {
1557        get_range_count: Arc<AtomicU64>,
1558        path: Path,
1559    }
1560
1561    impl lance_core::deepsize::DeepSizeOf for TrackingReader {
1562        fn deep_size_of_children(&self, _context: &mut lance_core::deepsize::Context) -> usize {
1563            0
1564        }
1565    }
1566
1567    impl Reader for TrackingReader {
1568        fn path(&self) -> &Path {
1569            &self.path
1570        }
1571
1572        fn block_size(&self) -> usize {
1573            4096
1574        }
1575
1576        fn io_parallelism(&self) -> usize {
1577            1
1578        }
1579
1580        fn size(&self) -> futures::future::BoxFuture<'_, object_store::Result<usize>> {
1581            Box::pin(async { Ok(1_000_000) })
1582        }
1583
1584        fn get_range(
1585            &self,
1586            range: Range<usize>,
1587        ) -> futures::future::BoxFuture<'static, object_store::Result<Bytes>> {
1588            self.get_range_count.fetch_add(1, Ordering::Release);
1589            let num_bytes = range.end - range.start;
1590            Box::pin(async move { Ok(Bytes::from(vec![0u8; num_bytes])) })
1591        }
1592
1593        fn get_all(&self) -> futures::future::BoxFuture<'_, object_store::Result<Bytes>> {
1594            Box::pin(async { Ok(Bytes::from(vec![0u8; 1_000_000])) })
1595        }
1596    }
1597
1598    #[tokio::test]
1599    async fn test_lite_scheduler_submits_eagerly() {
1600        let obj_store = Arc::new(ObjectStore::memory());
1601        let config = SchedulerConfig::default_for_testing().with_lite_scheduler();
1602        let scheduler = ScanScheduler::new(obj_store, config);
1603
1604        let get_range_count = Arc::new(AtomicU64::new(0));
1605        let reader: Arc<dyn Reader> = Arc::new(TrackingReader {
1606            get_range_count: get_range_count.clone(),
1607            path: Path::parse("test").unwrap(),
1608        });
1609
1610        // Submit several requests. The lite scheduler should call get_range
1611        // eagerly during submit (before the returned future is polled).
1612        let fut1 = scheduler.submit_request(reader.clone(), vec![0..100], 0, false);
1613        let fut2 = scheduler.submit_request(reader.clone(), vec![100..200], 10, false);
1614        let fut3 = scheduler.submit_request(reader.clone(), vec![200..300], 20, false);
1615
1616        // get_range must have been called for all 3 requests already.
1617        assert_eq!(get_range_count.load(Ordering::Acquire), 3);
1618
1619        // The futures should still resolve with the correct data.
1620        assert_eq!(fut1.await.unwrap()[0].len(), 100);
1621        assert_eq!(fut2.await.unwrap()[0].len(), 100);
1622        assert_eq!(fut3.await.unwrap()[0].len(), 100);
1623    }
1624
1625    #[tokio::test]
1626    async fn test_object_store_selects_scheduler() {
1627        // A memory:// store should use the standard scheduler when config is None
1628        let memory_store = Arc::new(ObjectStore::memory());
1629        assert!(!memory_store.prefers_lite_scheduler());
1630        let config = SchedulerConfig {
1631            io_buffer_size_bytes: 256 * 1024 * 1024,
1632            use_lite_scheduler: None,
1633        };
1634        let scheduler = ScanScheduler::new(memory_store.clone(), config);
1635        assert!(!scheduler.uses_lite_scheduler());
1636
1637        // A file+uring:// store should use the lite scheduler when config is None
1638        let uring_store = Arc::new(ObjectStore::new(
1639            Arc::new(InMemory::new()),
1640            Url::parse("file+uring:///tmp").unwrap(),
1641            None,
1642            None,
1643            false,
1644            false,
1645            8,
1646            DEFAULT_DOWNLOAD_RETRY_COUNT,
1647            None,
1648        ));
1649        assert!(uring_store.prefers_lite_scheduler());
1650        let config = SchedulerConfig {
1651            io_buffer_size_bytes: 256 * 1024 * 1024,
1652            use_lite_scheduler: None,
1653        };
1654        let scheduler = ScanScheduler::new(uring_store.clone(), config);
1655        assert!(scheduler.uses_lite_scheduler());
1656
1657        // Explicit Some(false) overrides a file+uring:// store's preference
1658        let config = SchedulerConfig {
1659            io_buffer_size_bytes: 256 * 1024 * 1024,
1660            use_lite_scheduler: Some(false),
1661        };
1662        let scheduler = ScanScheduler::new(uring_store, config);
1663        assert!(!scheduler.uses_lite_scheduler());
1664
1665        // Explicit Some(true) overrides a memory:// store's preference
1666        let config = SchedulerConfig {
1667            io_buffer_size_bytes: 256 * 1024 * 1024,
1668            use_lite_scheduler: Some(true),
1669        };
1670        let scheduler = ScanScheduler::new(memory_store, config);
1671        assert!(scheduler.uses_lite_scheduler());
1672    }
1673
1674    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1675    async fn stress_backpressure() {
1676        // This test ensures that the backpressure mechanism works correctly with
1677        // regards to priority.  In other words, as long as all requests are consumed
1678        // in priority order then the backpressure mechanism should not deadlock
1679        let some_path = Path::parse("foo").unwrap();
1680        let obj_store = Arc::new(ObjectStore::memory());
1681        obj_store
1682            .put(&some_path, vec![0; 100000].as_slice())
1683            .await
1684            .unwrap();
1685
1686        // Only one request will be allowed in
1687        let config = SchedulerConfig {
1688            io_buffer_size_bytes: 1,
1689            use_lite_scheduler: None,
1690        };
1691        let scan_scheduler = ScanScheduler::new(obj_store.clone(), config);
1692        let file_scheduler = scan_scheduler
1693            .open_file(&some_path, &CachedFileSize::unknown())
1694            .await
1695            .unwrap();
1696
1697        let mut futs = Vec::with_capacity(10000);
1698        for idx in 0..10000 {
1699            futs.push(file_scheduler.submit_single(idx..idx + 1, idx));
1700        }
1701
1702        for fut in futs {
1703            fut.await.unwrap();
1704        }
1705    }
1706
1707    #[tokio::test(flavor = "multi_thread")]
1708    async fn test_zero_buffer_size_no_backpressure() {
1709        // With io_buffer_size_bytes=0 (no_backpressure=true), reads at any priority go
1710        // through without blocking, even though a zero budget would normally halt all I/O.
1711        let obj_store = Arc::new(ObjectStore::memory());
1712        let config = SchedulerConfig {
1713            io_buffer_size_bytes: 0,
1714            use_lite_scheduler: Some(false),
1715        };
1716        let scheduler = ScanScheduler::new(obj_store, config);
1717
1718        let get_range_count = Arc::new(AtomicU64::new(0));
1719        let reader: Arc<dyn Reader> = Arc::new(TrackingReader {
1720            get_range_count: get_range_count.clone(),
1721            path: Path::parse("test").unwrap(),
1722        });
1723
1724        // Submit three reads at increasing priorities without awaiting any first.
1725        // Priority 1 and 2 would deadlock under a real 0-byte budget without no_backpressure.
1726        let fut1 = scheduler.submit_request(reader.clone(), vec![0..1000], 0, false);
1727        let fut2 = scheduler.submit_request(reader.clone(), vec![1000..2000], 1, false);
1728        let fut3 = scheduler.submit_request(reader.clone(), vec![2000..3000], 2, false);
1729
1730        let bytes1 = timeout(Duration::from_secs(5), fut1)
1731            .await
1732            .unwrap()
1733            .unwrap();
1734        let bytes2 = timeout(Duration::from_secs(5), fut2)
1735            .await
1736            .unwrap()
1737            .unwrap();
1738        let bytes3 = timeout(Duration::from_secs(5), fut3)
1739            .await
1740            .unwrap()
1741            .unwrap();
1742        assert_eq!(bytes1[0].len(), 1000);
1743        assert_eq!(bytes2[0].len(), 1000);
1744        assert_eq!(bytes3[0].len(), 1000);
1745        assert_eq!(get_range_count.load(Ordering::Acquire), 3);
1746    }
1747
1748    #[tokio::test(flavor = "multi_thread")]
1749    async fn test_file_scheduler_bypass_backpressure() {
1750        // A FileScheduler obtained via with_bypass_backpressure() submits reads that bypass
1751        // the byte budget, allowing them to proceed even when the budget is exhausted.
1752        let some_path = Path::parse("foo").unwrap();
1753        let base_store = Arc::new(InMemory::new());
1754        base_store
1755            .put(&some_path, vec![0u8; 1000].into())
1756            .await
1757            .unwrap();
1758
1759        let bytes_dispatched = Arc::new(AtomicU64::from(0));
1760        let mut obj_store = MockObjectStore::default();
1761        let bytes_dispatched_copy = bytes_dispatched.clone();
1762        obj_store
1763            .expect_get_opts()
1764            .returning(move |location, options| {
1765                let range = options.range.as_ref().unwrap();
1766                let num_bytes = match range {
1767                    GetRange::Bounded(bounded) => bounded.end - bounded.start,
1768                    _ => panic!(),
1769                };
1770                bytes_dispatched_copy.fetch_add(num_bytes, Ordering::Release);
1771                let location = location.clone();
1772                let base_store = base_store.clone();
1773                async move { base_store.get_opts(&location, options).await }.boxed()
1774            });
1775        let obj_store = Arc::new(ObjectStore::new(
1776            Arc::new(obj_store),
1777            Url::parse("mem://").unwrap(),
1778            Some(500),
1779            None,
1780            false,
1781            false,
1782            1,
1783            DEFAULT_DOWNLOAD_RETRY_COUNT,
1784            None,
1785        ));
1786
1787        // Budget = 10 bytes.
1788        let config = SchedulerConfig {
1789            io_buffer_size_bytes: 10,
1790            use_lite_scheduler: Some(false),
1791        };
1792        let scan_scheduler = ScanScheduler::new(obj_store, config);
1793        let file_scheduler = scan_scheduler
1794            .open_file(&Path::parse("foo").unwrap(), &CachedFileSize::new(1000))
1795            .await
1796            .unwrap();
1797        let bypass_scheduler = file_scheduler.with_bypass_backpressure();
1798
1799        // Fill the 10-byte budget with a priority-0 read.
1800        let blocker_fut = file_scheduler.submit_single(0..10, 0);
1801        while bytes_dispatched.load(Ordering::Acquire) < 10 {
1802            tokio::time::sleep(Duration::from_millis(1)).await;
1803        }
1804
1805        // A normal read at priority 2 is blocked: budget = 0, priority 2 > min-in-flight 0.
1806        // A bypass read at priority 1 (higher priority in the queue) bypasses the budget check.
1807        let normal_fut = file_scheduler.submit_single(0..10, 2);
1808        let bypass_fut = bypass_scheduler.submit_single(0..10, 1);
1809
1810        // Bypass read is dispatched; normal read is still blocked.
1811        while bytes_dispatched.load(Ordering::Acquire) < 20 {
1812            tokio::time::sleep(Duration::from_millis(1)).await;
1813        }
1814        tokio::time::sleep(Duration::from_millis(20)).await;
1815        assert_eq!(
1816            bytes_dispatched.load(Ordering::Acquire),
1817            20,
1818            "normal read should still be blocked while budget is exhausted"
1819        );
1820
1821        // Consuming the blocker releases its 10-byte budget → normal read can proceed.
1822        timeout(Duration::from_secs(5), blocker_fut)
1823            .await
1824            .unwrap()
1825            .unwrap();
1826        timeout(Duration::from_secs(5), bypass_fut)
1827            .await
1828            .unwrap()
1829            .unwrap();
1830        timeout(Duration::from_secs(5), normal_fut)
1831            .await
1832            .unwrap()
1833            .unwrap();
1834        assert_eq!(bytes_dispatched.load(Ordering::Acquire), 30);
1835    }
1836}