lance_io/
scheduler.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use bytes::Bytes;
5use futures::channel::oneshot;
6use futures::{FutureExt, TryFutureExt};
7use object_store::path::Path;
8use snafu::location;
9use std::collections::BinaryHeap;
10use std::fmt::Debug;
11use std::future::Future;
12use std::num::NonZero;
13use std::ops::Range;
14use std::sync::atomic::{AtomicU64, Ordering};
15use std::sync::{Arc, Mutex};
16use std::time::Instant;
17use tokio::sync::{Notify, Semaphore, SemaphorePermit};
18
19use lance_core::{Error, Result};
20
21use crate::object_store::ObjectStore;
22use crate::traits::Reader;
23use crate::utils::CachedFileSize;
24
25// Don't log backpressure warnings until at least this many seconds have passed
26const BACKPRESSURE_MIN: u64 = 5;
27// Don't log backpressure warnings more than once / minute
28const BACKPRESSURE_DEBOUNCE: u64 = 60;
29
30// Global counter of how many IOPS we have issued
31static IOPS_COUNTER: AtomicU64 = AtomicU64::new(0);
32// Global counter of how many bytes were read by the scheduler
33static BYTES_READ_COUNTER: AtomicU64 = AtomicU64::new(0);
34// By default, we limit the number of IOPS across the entire process to 128
35//
36// In theory this is enough for ~10GBps on S3 following the guidelines to issue
37// 1 IOP per 80MBps.  In practice, I have noticed slightly better performance going
38// up to 256.
39//
40// However, non-S3 stores (e.g. GCS, Azure) can suffer significantly from too many
41// concurrent IOPS.  For safety, we set the default to 128 and let the user override
42// this if needed.
43//
44// Note: this only limits things that run through the scheduler.  It does not limit
45// IOPS from other sources like writing or commits.
46static DEFAULT_PROCESS_IOPS_LIMIT: i32 = 128;
47
48pub fn iops_counter() -> u64 {
49    IOPS_COUNTER.load(Ordering::Acquire)
50}
51
52pub fn bytes_read_counter() -> u64 {
53    BYTES_READ_COUNTER.load(Ordering::Acquire)
54}
55
56// There are two structures that control the I/O scheduler concurrency.  First,
57// we have a hard limit on the number of IOPS that can be issued concurrently.
58// This limit is process-wide.
59//
60// Second, we try and limit how many I/O requests can be buffered in memory without
61// being consumed by a decoder of some kind.  This limit is per-scheduler.  We cannot
62// make this limit process wide without introducing deadlock (because the decoder for
63// file 0 might be waiting on IOPS blocked by a queue filled with requests for file 1)
64// and vice-versa.
65//
66// There is also a per-scan limit on the number of IOPS that can be issued concurrently.
67//
68// The process-wide limit exists when users need a hard limit on the number of parallel
69// IOPS, e.g. due to port availability limits or to prevent multiple scans from saturating
70// the network.  (Note: a process-wide limit of X will not necessarily limit the number of
71// open TCP connections to exactly X.  The underlying object store may open more connections
72// anyways)
73//
74// However, it can be too tough in some cases, e.g. when some scans are reading from
75// cloud storage and other scans are reading from local disk.  In these cases users don't
76// need to set a process-limit and can rely on the per-scan limits.
77
78// The IopsQuota enforces the first of the above limits, it is the per-process hard cap
79// on the number of IOPS that can be issued concurrently.
80//
81// The per-scan limits are enforced by IoQueue
82struct IopsQuota {
83    // An Option is used here to avoid mutex overhead if no limit is set
84    iops_avail: Option<Semaphore>,
85}
86
87/// A reservation on the global IOPS quota
88///
89/// When the reservation is dropped, the IOPS quota is released unless
90/// [`Self::forget`] is called.
91struct IopsReservation<'a> {
92    value: Option<SemaphorePermit<'a>>,
93}
94
95impl IopsReservation<'_> {
96    // Forget the reservation, so it won't be released on drop
97    fn forget(&mut self) {
98        if let Some(value) = self.value.take() {
99            value.forget();
100        }
101    }
102}
103
104impl IopsQuota {
105    // By default, we throttle the number of scan IOPS across the entire process
106    //
107    // However, the user can disable this by setting the environment variable
108    // LANCE_PROCESS_IO_THREADS_LIMIT to zero (or a negative integer).
109    fn new() -> Self {
110        let initial_capacity = std::env::var("LANCE_PROCESS_IO_THREADS_LIMIT")
111            .map(|s| {
112                s.parse::<i32>().unwrap_or_else(|_| {
113                    log::warn!("Ignoring invalid LANCE_PROCESS_IO_THREADS_LIMIT: {}", s);
114                    DEFAULT_PROCESS_IOPS_LIMIT
115                })
116            })
117            .unwrap_or(DEFAULT_PROCESS_IOPS_LIMIT);
118        let iops_avail = if initial_capacity <= 0 {
119            None
120        } else {
121            Some(Semaphore::new(initial_capacity as usize))
122        };
123        Self { iops_avail }
124    }
125
126    // Return a reservation on the global IOPS quota
127    fn release(&self) {
128        if let Some(iops_avail) = self.iops_avail.as_ref() {
129            iops_avail.add_permits(1);
130        }
131    }
132
133    // Acquire a reservation on the global IOPS quota
134    async fn acquire(&self) -> IopsReservation<'_> {
135        if let Some(iops_avail) = self.iops_avail.as_ref() {
136            IopsReservation {
137                value: Some(iops_avail.acquire().await.unwrap()),
138            }
139        } else {
140            IopsReservation { value: None }
141        }
142    }
143}
144
145static IOPS_QUOTA: std::sync::LazyLock<IopsQuota> = std::sync::LazyLock::new(IopsQuota::new);
146
147// We want to allow requests that have a lower priority than any
148// currently in-flight request.  This helps avoid potential deadlocks
149// related to backpressure.  Unfortunately, it is quite expensive to
150// keep track of which priorities are in-flight.
151//
152// TODO: At some point it would be nice if we can optimize this away but
153// in_flight should remain relatively small (generally less than 256 items)
154// and has not shown itself to be a bottleneck yet.
155struct PrioritiesInFlight {
156    in_flight: Vec<u128>,
157}
158
159impl PrioritiesInFlight {
160    fn new(capacity: u32) -> Self {
161        Self {
162            in_flight: Vec::with_capacity(capacity as usize * 2),
163        }
164    }
165
166    fn min_in_flight(&self) -> u128 {
167        self.in_flight.first().copied().unwrap_or(u128::MAX)
168    }
169
170    fn push(&mut self, prio: u128) {
171        let pos = match self.in_flight.binary_search(&prio) {
172            Ok(pos) => pos,
173            Err(pos) => pos,
174        };
175        self.in_flight.insert(pos, prio);
176    }
177
178    fn remove(&mut self, prio: u128) {
179        if let Ok(pos) = self.in_flight.binary_search(&prio) {
180            self.in_flight.remove(pos);
181        } else {
182            unreachable!();
183        }
184    }
185}
186
187struct IoQueueState {
188    // Number of IOPS we can issue concurrently before pausing I/O
189    iops_avail: u32,
190    // Number of bytes we are allowed to buffer in memory before pausing I/O
191    //
192    // This can dip below 0 due to I/O prioritization
193    bytes_avail: i64,
194    // Pending I/O requests
195    pending_requests: BinaryHeap<IoTask>,
196    // Priorities of in-flight requests
197    priorities_in_flight: PrioritiesInFlight,
198    // Set when the scheduler is finished to notify the I/O loop to shut down
199    // once all outstanding requests have been completed.
200    done_scheduling: bool,
201    // Time when the scheduler started
202    start: Instant,
203    // Last time we warned about backpressure
204    last_warn: AtomicU64,
205}
206
207impl IoQueueState {
208    fn new(io_capacity: u32, io_buffer_size: u64) -> Self {
209        Self {
210            iops_avail: io_capacity,
211            bytes_avail: io_buffer_size as i64,
212            pending_requests: BinaryHeap::new(),
213            priorities_in_flight: PrioritiesInFlight::new(io_capacity),
214            done_scheduling: false,
215            start: Instant::now(),
216            last_warn: AtomicU64::from(0),
217        }
218    }
219
220    fn finished(&self) -> bool {
221        self.done_scheduling && self.pending_requests.is_empty()
222    }
223
224    fn warn_if_needed(&self) {
225        let seconds_elapsed = self.start.elapsed().as_secs();
226        let last_warn = self.last_warn.load(Ordering::Acquire);
227        let since_last_warn = seconds_elapsed - last_warn;
228        if (last_warn == 0
229            && seconds_elapsed > BACKPRESSURE_MIN
230            && seconds_elapsed < BACKPRESSURE_DEBOUNCE)
231            || since_last_warn > BACKPRESSURE_DEBOUNCE
232        {
233            tracing::event!(tracing::Level::DEBUG, "Backpressure throttle exceeded");
234            log::debug!("Backpressure throttle is full, I/O will pause until buffer is drained.  Max I/O bandwidth will not be achieved because CPU is falling behind");
235            self.last_warn
236                .store(seconds_elapsed.max(1), Ordering::Release);
237        }
238    }
239
240    fn can_deliver(&self, task: &IoTask) -> bool {
241        if self.iops_avail == 0 {
242            false
243        } else if task.priority <= self.priorities_in_flight.min_in_flight() {
244            true
245        } else if task.num_bytes() as i64 > self.bytes_avail {
246            self.warn_if_needed();
247            false
248        } else {
249            true
250        }
251    }
252
253    fn next_task(&mut self) -> Option<IoTask> {
254        let task = self.pending_requests.peek()?;
255        if self.can_deliver(task) {
256            self.priorities_in_flight.push(task.priority);
257            self.iops_avail -= 1;
258            self.bytes_avail -= task.num_bytes() as i64;
259            if self.bytes_avail < 0 {
260                // This can happen when we admit special priority requests
261                log::debug!(
262                    "Backpressure throttle temporarily exceeded by {} bytes due to priority I/O",
263                    -self.bytes_avail
264                );
265            }
266            Some(self.pending_requests.pop().unwrap())
267        } else {
268            None
269        }
270    }
271}
272
273// This is modeled after the MPSC queue described here: https://docs.rs/tokio/latest/tokio/sync/struct.Notify.html
274//
275// However, it only needs to be SPSC since there is only one "scheduler thread"
276// and one I/O loop.
277struct IoQueue {
278    // Queue state
279    state: Mutex<IoQueueState>,
280    // Used to signal new I/O requests have arrived that might potentially be runnable
281    notify: Notify,
282}
283
284impl IoQueue {
285    fn new(io_capacity: u32, io_buffer_size: u64) -> Self {
286        Self {
287            state: Mutex::new(IoQueueState::new(io_capacity, io_buffer_size)),
288            notify: Notify::new(),
289        }
290    }
291
292    fn push(&self, task: IoTask) {
293        log::trace!(
294            "Inserting I/O request for {} bytes with priority ({},{}) into I/O queue",
295            task.num_bytes(),
296            task.priority >> 64,
297            task.priority & 0xFFFFFFFFFFFFFFFF
298        );
299        let mut state = self.state.lock().unwrap();
300        state.pending_requests.push(task);
301        drop(state);
302
303        self.notify.notify_one();
304    }
305
306    async fn pop(&self) -> Option<IoTask> {
307        loop {
308            {
309                // First, grab a reservation on the global IOPS quota
310                // If we then get a task to run, transfer the reservation
311                // to the task.  Otherwise, the reservation will be released
312                // when iop_res is dropped.
313                let mut iop_res = IOPS_QUOTA.acquire().await;
314                // Next, try and grab a reservation from the queue
315                let mut state = self.state.lock().unwrap();
316                if let Some(task) = state.next_task() {
317                    // Reservation successfully acquired, we will release the global
318                    // global reservation after task has run.
319                    iop_res.forget();
320                    return Some(task);
321                }
322
323                if state.finished() {
324                    return None;
325                }
326            }
327
328            self.notify.notified().await;
329        }
330    }
331
332    fn on_iop_complete(&self) {
333        let mut state = self.state.lock().unwrap();
334        state.iops_avail += 1;
335        drop(state);
336
337        self.notify.notify_one();
338    }
339
340    fn on_bytes_consumed(&self, bytes: u64, priority: u128, num_reqs: usize) {
341        let mut state = self.state.lock().unwrap();
342        state.bytes_avail += bytes as i64;
343        for _ in 0..num_reqs {
344            state.priorities_in_flight.remove(priority);
345        }
346        drop(state);
347
348        self.notify.notify_one();
349    }
350
351    fn close(&self) {
352        let mut state = self.state.lock().unwrap();
353        state.done_scheduling = true;
354        drop(state);
355
356        self.notify.notify_one();
357    }
358}
359
360// There is one instance of MutableBatch shared by all the I/O operations
361// that make up a single request.  When all the I/O operations complete
362// then the MutableBatch goes out of scope and the batch request is considered
363// complete
364struct MutableBatch<F: FnOnce(Response) + Send> {
365    when_done: Option<F>,
366    data_buffers: Vec<Bytes>,
367    num_bytes: u64,
368    priority: u128,
369    num_reqs: usize,
370    err: Option<Box<dyn std::error::Error + Send + Sync + 'static>>,
371}
372
373impl<F: FnOnce(Response) + Send> MutableBatch<F> {
374    fn new(when_done: F, num_data_buffers: u32, priority: u128, num_reqs: usize) -> Self {
375        Self {
376            when_done: Some(when_done),
377            data_buffers: vec![Bytes::default(); num_data_buffers as usize],
378            num_bytes: 0,
379            priority,
380            num_reqs,
381            err: None,
382        }
383    }
384}
385
386// Rather than keep track of when all the I/O requests are finished so that we
387// can deliver the batch of data we let Rust do that for us.  When all I/O's are
388// done then the MutableBatch will go out of scope and we know we have all the
389// data.
390impl<F: FnOnce(Response) + Send> Drop for MutableBatch<F> {
391    fn drop(&mut self) {
392        // If we have an error, return that.  Otherwise return the data
393        let result = if self.err.is_some() {
394            Err(Error::Wrapped {
395                error: self.err.take().unwrap(),
396                location: location!(),
397            })
398        } else {
399            let mut data = Vec::new();
400            std::mem::swap(&mut data, &mut self.data_buffers);
401            Ok(data)
402        };
403        // We don't really care if no one is around to receive it, just let
404        // the result go out of scope and get cleaned up
405        let response = Response {
406            data: result,
407            num_bytes: self.num_bytes,
408            priority: self.priority,
409            num_reqs: self.num_reqs,
410        };
411        (self.when_done.take().unwrap())(response);
412    }
413}
414
415struct DataChunk {
416    task_idx: usize,
417    num_bytes: u64,
418    data: Result<Bytes>,
419}
420
421trait DataSink: Send {
422    fn deliver_data(&mut self, data: DataChunk);
423}
424
425impl<F: FnOnce(Response) + Send> DataSink for MutableBatch<F> {
426    // Called by worker tasks to add data to the MutableBatch
427    fn deliver_data(&mut self, data: DataChunk) {
428        self.num_bytes += data.num_bytes;
429        match data.data {
430            Ok(data_bytes) => {
431                self.data_buffers[data.task_idx] = data_bytes;
432            }
433            Err(err) => {
434                // This keeps the original error, if present
435                self.err.get_or_insert(Box::new(err));
436            }
437        }
438    }
439}
440
441struct IoTask {
442    reader: Arc<dyn Reader>,
443    to_read: Range<u64>,
444    when_done: Box<dyn FnOnce(Result<Bytes>) + Send>,
445    priority: u128,
446}
447
448impl Eq for IoTask {}
449
450impl PartialEq for IoTask {
451    fn eq(&self, other: &Self) -> bool {
452        self.priority == other.priority
453    }
454}
455
456impl PartialOrd for IoTask {
457    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
458        Some(self.cmp(other))
459    }
460}
461
462impl Ord for IoTask {
463    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
464        // This is intentionally inverted.  We want a min-heap
465        other.priority.cmp(&self.priority)
466    }
467}
468
469impl IoTask {
470    fn num_bytes(&self) -> u64 {
471        self.to_read.end - self.to_read.start
472    }
473
474    async fn run(self) {
475        let bytes = if self.to_read.start == self.to_read.end {
476            Ok(Bytes::new())
477        } else {
478            let bytes_fut = self
479                .reader
480                .get_range(self.to_read.start as usize..self.to_read.end as usize);
481            IOPS_COUNTER.fetch_add(1, Ordering::Release);
482            let num_bytes = self.num_bytes();
483            bytes_fut
484                .inspect(move |_| {
485                    BYTES_READ_COUNTER.fetch_add(num_bytes, Ordering::Release);
486                })
487                .await
488                .map_err(Error::from)
489        };
490        IOPS_QUOTA.release();
491        (self.when_done)(bytes);
492    }
493}
494
495// Every time a scheduler starts up it launches a task to run the I/O loop.  This loop
496// repeats endlessly until the scheduler is destroyed.
497async fn run_io_loop(tasks: Arc<IoQueue>) {
498    // Pop the first finished task off the queue and submit another until
499    // we are done
500    loop {
501        let next_task = tasks.pop().await;
502        match next_task {
503            Some(task) => {
504                tokio::spawn(task.run());
505            }
506            None => {
507                // The sender has been dropped, we are done
508                return;
509            }
510        }
511    }
512}
513
514#[derive(Debug)]
515struct StatsCollector {
516    iops: AtomicU64,
517    requests: AtomicU64,
518    bytes_read: AtomicU64,
519}
520
521impl StatsCollector {
522    fn new() -> Self {
523        Self {
524            iops: AtomicU64::new(0),
525            requests: AtomicU64::new(0),
526            bytes_read: AtomicU64::new(0),
527        }
528    }
529
530    fn iops(&self) -> u64 {
531        self.iops.load(Ordering::Relaxed)
532    }
533
534    fn bytes_read(&self) -> u64 {
535        self.bytes_read.load(Ordering::Relaxed)
536    }
537
538    fn requests(&self) -> u64 {
539        self.requests.load(Ordering::Relaxed)
540    }
541
542    fn record_request(&self, request: &[Range<u64>]) {
543        self.requests.fetch_add(1, Ordering::Relaxed);
544        self.iops.fetch_add(request.len() as u64, Ordering::Relaxed);
545        self.bytes_read.fetch_add(
546            request.iter().map(|r| r.end - r.start).sum::<u64>(),
547            Ordering::Relaxed,
548        );
549    }
550}
551
552pub struct ScanStats {
553    pub iops: u64,
554    pub requests: u64,
555    pub bytes_read: u64,
556}
557
558impl ScanStats {
559    fn new(stats: &StatsCollector) -> Self {
560        Self {
561            iops: stats.iops(),
562            requests: stats.requests(),
563            bytes_read: stats.bytes_read(),
564        }
565    }
566}
567
568/// An I/O scheduler which wraps an ObjectStore and throttles the amount of
569/// parallel I/O that can be run.
570///
571/// TODO: This will also add coalescing
572pub struct ScanScheduler {
573    object_store: Arc<ObjectStore>,
574    io_queue: Arc<IoQueue>,
575    stats: Arc<StatsCollector>,
576}
577
578impl Debug for ScanScheduler {
579    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
580        f.debug_struct("ScanScheduler")
581            .field("object_store", &self.object_store)
582            .finish()
583    }
584}
585
586struct Response {
587    data: Result<Vec<Bytes>>,
588    priority: u128,
589    num_reqs: usize,
590    num_bytes: u64,
591}
592
593#[derive(Debug, Clone, Copy)]
594pub struct SchedulerConfig {
595    /// the # of bytes that can be buffered but not yet requested.
596    /// This controls back pressure.  If data is not processed quickly enough then this
597    /// buffer will fill up and the I/O loop will pause until the buffer is drained.
598    pub io_buffer_size_bytes: u64,
599}
600
601impl SchedulerConfig {
602    /// Big enough for unit testing
603    pub fn default_for_testing() -> Self {
604        Self {
605            io_buffer_size_bytes: 256 * 1024 * 1024,
606        }
607    }
608
609    /// Configuration that should generally maximize bandwidth (not trying to save RAM
610    /// at all).  We assume a max page size of 32MiB and then allow 32MiB per I/O thread
611    pub fn max_bandwidth(store: &ObjectStore) -> Self {
612        Self {
613            io_buffer_size_bytes: 32 * 1024 * 1024 * store.io_parallelism() as u64,
614        }
615    }
616}
617
618impl ScanScheduler {
619    /// Create a new scheduler with the given I/O capacity
620    ///
621    /// # Arguments
622    ///
623    /// * object_store - the store to wrap
624    /// * config - configuration settings for the scheduler
625    pub fn new(object_store: Arc<ObjectStore>, config: SchedulerConfig) -> Arc<Self> {
626        let io_capacity = object_store.io_parallelism();
627        let io_queue = Arc::new(IoQueue::new(
628            io_capacity as u32,
629            config.io_buffer_size_bytes,
630        ));
631        let scheduler = Self {
632            object_store,
633            io_queue: io_queue.clone(),
634            stats: Arc::new(StatsCollector::new()),
635        };
636        tokio::task::spawn(async move { run_io_loop(io_queue).await });
637        Arc::new(scheduler)
638    }
639
640    /// Open a file for reading
641    ///
642    /// # Arguments
643    ///
644    /// * path - the path to the file to open
645    /// * base_priority - the base priority for I/O requests submitted to this file scheduler
646    ///   this will determine the upper 64 bits of priority (the lower 64 bits
647    ///   come from `submit_request` and `submit_single`)
648    pub async fn open_file_with_priority(
649        self: &Arc<Self>,
650        path: &Path,
651        base_priority: u64,
652        file_size_bytes: &CachedFileSize,
653    ) -> Result<FileScheduler> {
654        let file_size_bytes = if let Some(size) = file_size_bytes.get() {
655            u64::from(size)
656        } else {
657            let size = self.object_store.size(path).await?;
658            if let Some(size) = NonZero::new(size) {
659                file_size_bytes.set(size);
660            }
661            size
662        };
663        let reader = self
664            .object_store
665            .open_with_size(path, file_size_bytes as usize)
666            .await?;
667        let block_size = self.object_store.block_size() as u64;
668        let max_iop_size = self.object_store.max_iop_size();
669        Ok(FileScheduler {
670            reader: reader.into(),
671            block_size,
672            root: self.clone(),
673            base_priority,
674            max_iop_size,
675        })
676    }
677
678    /// Open a file with a default priority of 0
679    ///
680    /// See [`Self::open_file_with_priority`] for more information on the priority
681    pub async fn open_file(
682        self: &Arc<Self>,
683        path: &Path,
684        file_size_bytes: &CachedFileSize,
685    ) -> Result<FileScheduler> {
686        self.open_file_with_priority(path, 0, file_size_bytes).await
687    }
688
689    fn do_submit_request(
690        &self,
691        reader: Arc<dyn Reader>,
692        request: Vec<Range<u64>>,
693        tx: oneshot::Sender<Response>,
694        priority: u128,
695    ) {
696        let num_iops = request.len() as u32;
697
698        let when_all_io_done = move |bytes_and_permits| {
699            // We don't care if the receiver has given up so discard the result
700            let _ = tx.send(bytes_and_permits);
701        };
702
703        let dest = Arc::new(Mutex::new(Box::new(MutableBatch::new(
704            when_all_io_done,
705            num_iops,
706            priority,
707            request.len(),
708        ))));
709
710        for (task_idx, iop) in request.into_iter().enumerate() {
711            let dest = dest.clone();
712            let io_queue = self.io_queue.clone();
713            let num_bytes = iop.end - iop.start;
714            let task = IoTask {
715                reader: reader.clone(),
716                to_read: iop,
717                priority,
718                when_done: Box::new(move |data| {
719                    io_queue.on_iop_complete();
720                    let mut dest = dest.lock().unwrap();
721                    let chunk = DataChunk {
722                        data,
723                        task_idx,
724                        num_bytes,
725                    };
726                    dest.deliver_data(chunk);
727                }),
728            };
729            self.io_queue.push(task);
730        }
731    }
732
733    fn submit_request(
734        &self,
735        reader: Arc<dyn Reader>,
736        request: Vec<Range<u64>>,
737        priority: u128,
738    ) -> impl Future<Output = Result<Vec<Bytes>>> + Send {
739        let (tx, rx) = oneshot::channel::<Response>();
740
741        self.do_submit_request(reader, request, tx, priority);
742
743        let io_queue = self.io_queue.clone();
744
745        rx.map(move |wrapped_rsp| {
746            // Right now, it isn't possible for I/O to be cancelled so a cancel error should
747            // not occur
748            let rsp = wrapped_rsp.unwrap();
749            io_queue.on_bytes_consumed(rsp.num_bytes, rsp.priority, rsp.num_reqs);
750            rsp.data
751        })
752    }
753
754    pub fn stats(&self) -> ScanStats {
755        ScanStats::new(self.stats.as_ref())
756    }
757}
758
759impl Drop for ScanScheduler {
760    fn drop(&mut self) {
761        self.io_queue.close();
762    }
763}
764
765/// A throttled file reader
766#[derive(Clone, Debug)]
767pub struct FileScheduler {
768    reader: Arc<dyn Reader>,
769    root: Arc<ScanScheduler>,
770    block_size: u64,
771    base_priority: u64,
772    max_iop_size: u64,
773}
774
775fn is_close_together(range1: &Range<u64>, range2: &Range<u64>, block_size: u64) -> bool {
776    // Note that range1.end <= range2.start is possible (e.g. when decoding string arrays)
777    range2.start <= (range1.end + block_size)
778}
779
780fn is_overlapping(range1: &Range<u64>, range2: &Range<u64>) -> bool {
781    range1.start < range2.end && range2.start < range1.end
782}
783
784impl FileScheduler {
785    /// Submit a batch of I/O requests to the reader
786    ///
787    /// The requests will be queued in a FIFO manner and, when all requests
788    /// have been fulfilled, the returned future will be completed.
789    ///
790    /// Each request has a given priority.  If the I/O loop is full then requests
791    /// will be buffered and requests with the *lowest* priority will be released
792    /// from the buffer first.
793    ///
794    /// Each request has a backpressure ID which controls which backpressure throttle
795    /// is applied to the request.  Requests made to the same backpressure throttle
796    /// will be throttled together.
797    pub fn submit_request(
798        &self,
799        request: Vec<Range<u64>>,
800        priority: u64,
801    ) -> impl Future<Output = Result<Vec<Bytes>>> + Send {
802        // The final priority is a combination of the row offset and the file number
803        let priority = ((self.base_priority as u128) << 64) + priority as u128;
804
805        let mut merged_requests = Vec::with_capacity(request.len());
806
807        if !request.is_empty() {
808            let mut curr_interval = request[0].clone();
809
810            for req in request.iter().skip(1) {
811                if is_close_together(&curr_interval, req, self.block_size) {
812                    curr_interval.end = curr_interval.end.max(req.end);
813                } else {
814                    merged_requests.push(curr_interval);
815                    curr_interval = req.clone();
816                }
817            }
818
819            merged_requests.push(curr_interval);
820        }
821
822        let mut updated_requests = Vec::with_capacity(merged_requests.len());
823        for req in merged_requests {
824            if req.is_empty() {
825                updated_requests.push(req);
826            } else {
827                let num_requests = (req.end - req.start).div_ceil(self.max_iop_size);
828                let bytes_per_request = (req.end - req.start) / num_requests;
829                for i in 0..num_requests {
830                    let start = req.start + i * bytes_per_request;
831                    let end = if i == num_requests - 1 {
832                        // Last request is a bit bigger due to rounding
833                        req.end
834                    } else {
835                        start + bytes_per_request
836                    };
837                    updated_requests.push(start..end);
838                }
839            }
840        }
841
842        self.root.stats.record_request(&updated_requests);
843
844        let bytes_vec_fut =
845            self.root
846                .submit_request(self.reader.clone(), updated_requests.clone(), priority);
847
848        let mut updated_index = 0;
849        let mut final_bytes = Vec::with_capacity(request.len());
850
851        async move {
852            let bytes_vec = bytes_vec_fut.await?;
853
854            let mut orig_index = 0;
855            while (updated_index < updated_requests.len()) && (orig_index < request.len()) {
856                let updated_range = &updated_requests[updated_index];
857                let orig_range = &request[orig_index];
858                let byte_offset = updated_range.start as usize;
859
860                if is_overlapping(updated_range, orig_range) {
861                    // We need to undo the coalescing and splitting done earlier
862                    let start = orig_range.start as usize - byte_offset;
863                    if orig_range.end <= updated_range.end {
864                        // The original range is fully contained in the updated range, can do
865                        // zero-copy slice
866                        let end = orig_range.end as usize - byte_offset;
867                        final_bytes.push(bytes_vec[updated_index].slice(start..end));
868                    } else {
869                        // The original read was split into multiple requests, need to copy
870                        // back into a single buffer
871                        let orig_size = orig_range.end - orig_range.start;
872                        let mut merged_bytes = Vec::with_capacity(orig_size as usize);
873                        merged_bytes.extend_from_slice(&bytes_vec[updated_index].slice(start..));
874                        let mut copy_offset = merged_bytes.len() as u64;
875                        while copy_offset < orig_size {
876                            updated_index += 1;
877                            let next_range = &updated_requests[updated_index];
878                            let bytes_to_take =
879                                (orig_size - copy_offset).min(next_range.end - next_range.start);
880                            merged_bytes.extend_from_slice(
881                                &bytes_vec[updated_index].slice(0..bytes_to_take as usize),
882                            );
883                            copy_offset += bytes_to_take;
884                        }
885                        final_bytes.push(Bytes::from(merged_bytes));
886                    }
887                    orig_index += 1;
888                } else {
889                    updated_index += 1;
890                }
891            }
892
893            Ok(final_bytes)
894        }
895    }
896
897    pub fn with_priority(&self, priority: u64) -> Self {
898        Self {
899            reader: self.reader.clone(),
900            root: self.root.clone(),
901            block_size: self.block_size,
902            max_iop_size: self.max_iop_size,
903            base_priority: priority,
904        }
905    }
906
907    /// Submit a single IOP to the reader
908    ///
909    /// If you have multiple IOPS to perform then [`Self::submit_request`] is going
910    /// to be more efficient.
911    ///
912    /// See [`Self::submit_request`] for more information on the priority and backpressure.
913    pub fn submit_single(
914        &self,
915        range: Range<u64>,
916        priority: u64,
917    ) -> impl Future<Output = Result<Bytes>> + Send {
918        self.submit_request(vec![range], priority)
919            .map_ok(|vec_bytes| vec_bytes.into_iter().next().unwrap())
920    }
921
922    /// Provides access to the underlying reader
923    ///
924    /// Do not use this for reading data as it will bypass any I/O scheduling!
925    /// This is mainly exposed to allow metadata operations (e.g size, block_size,)
926    /// which either aren't IOPS or we don't throttle
927    pub fn reader(&self) -> &Arc<dyn Reader> {
928        &self.reader
929    }
930}
931
932#[cfg(test)]
933mod tests {
934    use std::{collections::VecDeque, time::Duration};
935
936    use futures::poll;
937    use rand::RngCore;
938    use tempfile::tempdir;
939
940    use object_store::{memory::InMemory, GetRange, ObjectStore as OSObjectStore};
941    use tokio::{runtime::Handle, time::timeout};
942    use url::Url;
943
944    use crate::{
945        object_store::{DEFAULT_DOWNLOAD_RETRY_COUNT, DEFAULT_MAX_IOP_SIZE},
946        testing::MockObjectStore,
947    };
948
949    use super::*;
950
951    #[tokio::test]
952    async fn test_full_seq_read() {
953        let tmpdir = tempdir().unwrap();
954        let tmp_path = tmpdir.path().to_str().unwrap();
955        let tmp_path = Path::parse(tmp_path).unwrap();
956        let tmp_file = tmp_path.child("foo.file");
957
958        let obj_store = Arc::new(ObjectStore::local());
959
960        // Write 1MiB of data
961        const DATA_SIZE: u64 = 1024 * 1024;
962        let mut some_data = vec![0; DATA_SIZE as usize];
963        rand::rng().fill_bytes(&mut some_data);
964        obj_store.put(&tmp_file, &some_data).await.unwrap();
965
966        let config = SchedulerConfig::default_for_testing();
967
968        let scheduler = ScanScheduler::new(obj_store, config);
969
970        let file_scheduler = scheduler
971            .open_file(&tmp_file, &CachedFileSize::unknown())
972            .await
973            .unwrap();
974
975        // Read it back 4KiB at a time
976        const READ_SIZE: u64 = 4 * 1024;
977        let mut reqs = VecDeque::new();
978        let mut offset = 0;
979        while offset < DATA_SIZE {
980            reqs.push_back(
981                #[allow(clippy::single_range_in_vec_init)]
982                file_scheduler
983                    .submit_request(vec![offset..offset + READ_SIZE], 0)
984                    .await
985                    .unwrap(),
986            );
987            offset += READ_SIZE;
988        }
989
990        offset = 0;
991        // Note: we should get parallel I/O even though we are consuming serially
992        while offset < DATA_SIZE {
993            let data = reqs.pop_front().unwrap();
994            let actual = &data[0];
995            let expected = &some_data[offset as usize..(offset + READ_SIZE) as usize];
996            assert_eq!(expected, actual);
997            offset += READ_SIZE;
998        }
999    }
1000
1001    #[tokio::test]
1002    async fn test_split_coalesce() {
1003        let tmpdir = tempdir().unwrap();
1004        let tmp_path = tmpdir.path().to_str().unwrap();
1005        let tmp_path = Path::parse(tmp_path).unwrap();
1006        let tmp_file = tmp_path.child("foo.file");
1007
1008        let obj_store = Arc::new(ObjectStore::local());
1009
1010        // Write 75MiB of data
1011        const DATA_SIZE: u64 = 75 * 1024 * 1024;
1012        let mut some_data = vec![0; DATA_SIZE as usize];
1013        rand::rng().fill_bytes(&mut some_data);
1014        obj_store.put(&tmp_file, &some_data).await.unwrap();
1015
1016        let config = SchedulerConfig::default_for_testing();
1017
1018        let scheduler = ScanScheduler::new(obj_store, config);
1019
1020        let file_scheduler = scheduler
1021            .open_file(&tmp_file, &CachedFileSize::unknown())
1022            .await
1023            .unwrap();
1024
1025        // These 3 requests should be coalesced into a single I/O because they are within 4KiB
1026        // of each other
1027        let req =
1028            file_scheduler.submit_request(vec![50_000..51_000, 52_000..53_000, 54_000..55_000], 0);
1029
1030        let bytes = req.await.unwrap();
1031
1032        assert_eq!(bytes[0], &some_data[50_000..51_000]);
1033        assert_eq!(bytes[1], &some_data[52_000..53_000]);
1034        assert_eq!(bytes[2], &some_data[54_000..55_000]);
1035
1036        assert_eq!(1, scheduler.stats().iops);
1037
1038        // This should be split into 5 requests because it is so large
1039        let req = file_scheduler.submit_request(vec![0..DATA_SIZE], 0);
1040        let bytes = req.await.unwrap();
1041        assert!(bytes[0] == some_data, "data is not the same");
1042
1043        assert_eq!(6, scheduler.stats().iops);
1044
1045        // None of these requests are bigger than the max IOP size but they will be coalesced into
1046        // one IOP that is bigger and then split back into 2 requests that don't quite align with the original
1047        // ranges.
1048        let chunk_size = *DEFAULT_MAX_IOP_SIZE;
1049        let req = file_scheduler.submit_request(
1050            vec![
1051                10..chunk_size,
1052                chunk_size + 10..(chunk_size * 2) - 20,
1053                chunk_size * 2..(chunk_size * 2) + 10,
1054            ],
1055            0,
1056        );
1057
1058        let bytes = req.await.unwrap();
1059        let chunk_size = chunk_size as usize;
1060        assert!(
1061            bytes[0] == some_data[10..chunk_size],
1062            "data is not the same"
1063        );
1064        assert!(
1065            bytes[1] == some_data[chunk_size + 10..(chunk_size * 2) - 20],
1066            "data is not the same"
1067        );
1068        assert!(
1069            bytes[2] == some_data[chunk_size * 2..(chunk_size * 2) + 10],
1070            "data is not the same"
1071        );
1072        assert_eq!(8, scheduler.stats().iops);
1073
1074        let reads = (0..44)
1075            .map(|i| (i * 1_000_000..(i + 1) * 1_000_000))
1076            .collect::<Vec<_>>();
1077        let req = file_scheduler.submit_request(reads, 0);
1078        let bytes = req.await.unwrap();
1079        for (i, bytes) in bytes.iter().enumerate() {
1080            assert!(
1081                bytes == &some_data[i * 1_000_000..(i + 1) * 1_000_000],
1082                "data is not the same"
1083            );
1084        }
1085        assert_eq!(11, scheduler.stats().iops);
1086    }
1087
1088    #[tokio::test]
1089    async fn test_priority() {
1090        let some_path = Path::parse("foo").unwrap();
1091        let base_store = Arc::new(InMemory::new());
1092        base_store
1093            .put(&some_path, vec![0; 1000].into())
1094            .await
1095            .unwrap();
1096
1097        let semaphore = Arc::new(tokio::sync::Semaphore::new(0));
1098        let mut obj_store = MockObjectStore::default();
1099        let semaphore_copy = semaphore.clone();
1100        obj_store
1101            .expect_get_opts()
1102            .returning(move |location, options| {
1103                let semaphore = semaphore.clone();
1104                let base_store = base_store.clone();
1105                let location = location.clone();
1106                async move {
1107                    semaphore.acquire().await.unwrap().forget();
1108                    base_store.get_opts(&location, options).await
1109                }
1110                .boxed()
1111            });
1112        let obj_store = Arc::new(ObjectStore::new(
1113            Arc::new(obj_store),
1114            Url::parse("mem://").unwrap(),
1115            Some(500),
1116            None,
1117            false,
1118            false,
1119            1,
1120            DEFAULT_DOWNLOAD_RETRY_COUNT,
1121        ));
1122
1123        let config = SchedulerConfig {
1124            io_buffer_size_bytes: 1024 * 1024,
1125        };
1126
1127        let scan_scheduler = ScanScheduler::new(obj_store, config);
1128
1129        let file_scheduler = scan_scheduler
1130            .open_file(&Path::parse("foo").unwrap(), &CachedFileSize::new(1000))
1131            .await
1132            .unwrap();
1133
1134        // Issue a request, priority doesn't matter, it will be submitted
1135        // immediately (it will go pending)
1136        // Note: the timeout is to prevent a deadlock if the test fails.
1137        let first_fut = timeout(
1138            Duration::from_secs(10),
1139            file_scheduler.submit_single(0..10, 0),
1140        )
1141        .boxed();
1142
1143        // Issue another low priority request (it will go in queue)
1144        let mut second_fut = timeout(
1145            Duration::from_secs(10),
1146            file_scheduler.submit_single(0..20, 100),
1147        )
1148        .boxed();
1149
1150        // Issue a high priority request (it will go in queue and should bump
1151        // the other queued request down)
1152        let mut third_fut = timeout(
1153            Duration::from_secs(10),
1154            file_scheduler.submit_single(0..30, 0),
1155        )
1156        .boxed();
1157
1158        // Finish one file, should be the in-flight first request
1159        semaphore_copy.add_permits(1);
1160        assert!(first_fut.await.unwrap().unwrap().len() == 10);
1161        // Other requests should not be finished
1162        assert!(poll!(&mut second_fut).is_pending());
1163        assert!(poll!(&mut third_fut).is_pending());
1164
1165        // Next should be high priority request
1166        semaphore_copy.add_permits(1);
1167        assert!(third_fut.await.unwrap().unwrap().len() == 30);
1168        assert!(poll!(&mut second_fut).is_pending());
1169
1170        // Finally, the low priority request
1171        semaphore_copy.add_permits(1);
1172        assert!(second_fut.await.unwrap().unwrap().len() == 20);
1173    }
1174
1175    #[tokio::test(flavor = "multi_thread")]
1176    async fn test_backpressure() {
1177        let some_path = Path::parse("foo").unwrap();
1178        let base_store = Arc::new(InMemory::new());
1179        base_store
1180            .put(&some_path, vec![0; 100000].into())
1181            .await
1182            .unwrap();
1183
1184        let bytes_read = Arc::new(AtomicU64::from(0));
1185        let mut obj_store = MockObjectStore::default();
1186        let bytes_read_copy = bytes_read.clone();
1187        // Wraps the obj_store to keep track of how many bytes have been read
1188        obj_store
1189            .expect_get_opts()
1190            .returning(move |location, options| {
1191                let range = options.range.as_ref().unwrap();
1192                let num_bytes = match range {
1193                    GetRange::Bounded(bounded) => bounded.end - bounded.start,
1194                    _ => panic!(),
1195                };
1196                bytes_read_copy.fetch_add(num_bytes, Ordering::Release);
1197                let location = location.clone();
1198                let base_store = base_store.clone();
1199                async move { base_store.get_opts(&location, options).await }.boxed()
1200            });
1201        let obj_store = Arc::new(ObjectStore::new(
1202            Arc::new(obj_store),
1203            Url::parse("mem://").unwrap(),
1204            Some(500),
1205            None,
1206            false,
1207            false,
1208            1,
1209            DEFAULT_DOWNLOAD_RETRY_COUNT,
1210        ));
1211
1212        let config = SchedulerConfig {
1213            io_buffer_size_bytes: 10,
1214        };
1215
1216        let scan_scheduler = ScanScheduler::new(obj_store.clone(), config);
1217
1218        let file_scheduler = scan_scheduler
1219            .open_file(&Path::parse("foo").unwrap(), &CachedFileSize::new(100000))
1220            .await
1221            .unwrap();
1222
1223        let wait_for_idle = || async move {
1224            let handle = Handle::current();
1225            while handle.metrics().num_alive_tasks() != 1 {
1226                tokio::time::sleep(Duration::from_millis(10)).await;
1227            }
1228        };
1229        let wait_for_bytes_read_and_idle = |target_bytes: u64| {
1230            // We need to move `target` but don't want to move `bytes_read`
1231            let bytes_read = &bytes_read;
1232            async move {
1233                let bytes_read_copy = bytes_read.clone();
1234                while bytes_read_copy.load(Ordering::Acquire) < target_bytes {
1235                    tokio::time::sleep(Duration::from_millis(10)).await;
1236                }
1237                wait_for_idle().await;
1238            }
1239        };
1240
1241        // This read will begin immediately
1242        let first_fut = file_scheduler.submit_single(0..5, 0);
1243        // This read should also begin immediately
1244        let second_fut = file_scheduler.submit_single(0..5, 0);
1245        // This read will be throttled
1246        let third_fut = file_scheduler.submit_single(0..3, 0);
1247        // Two tasks (third_fut and unit test)
1248        wait_for_bytes_read_and_idle(10).await;
1249
1250        assert_eq!(first_fut.await.unwrap().len(), 5);
1251        // One task (unit test)
1252        wait_for_bytes_read_and_idle(13).await;
1253
1254        // 2 bytes are ready but 5 bytes requested, read will be blocked
1255        let fourth_fut = file_scheduler.submit_single(0..5, 0);
1256        wait_for_bytes_read_and_idle(13).await;
1257
1258        // Out of order completion is ok, will unblock backpressure
1259        assert_eq!(third_fut.await.unwrap().len(), 3);
1260        wait_for_bytes_read_and_idle(18).await;
1261
1262        assert_eq!(second_fut.await.unwrap().len(), 5);
1263        // At this point there are 5 bytes available in backpressure queue
1264        // Now we issue multi-read that can be partially fulfilled, it will read some bytes but
1265        // not all of them. (using large range gap to ensure request not coalesced)
1266        //
1267        // I'm actually not sure this behavior is great.  It's possible that we should just
1268        // block until we can fulfill the entire request.
1269        let fifth_fut = file_scheduler.submit_request(vec![0..3, 90000..90007], 0);
1270        wait_for_bytes_read_and_idle(21).await;
1271
1272        // Fifth future should eventually finish due to deadlock prevention
1273        let fifth_bytes = tokio::time::timeout(Duration::from_secs(10), fifth_fut)
1274            .await
1275            .unwrap();
1276        assert_eq!(
1277            fifth_bytes.unwrap().iter().map(|b| b.len()).sum::<usize>(),
1278            10
1279        );
1280
1281        // And now let's just make sure that we can read the rest of the data
1282        assert_eq!(fourth_fut.await.unwrap().len(), 5);
1283        wait_for_bytes_read_and_idle(28).await;
1284
1285        // Ensure deadlock prevention timeout can be disabled
1286        let config = SchedulerConfig {
1287            io_buffer_size_bytes: 10,
1288        };
1289
1290        let scan_scheduler = ScanScheduler::new(obj_store, config);
1291        let file_scheduler = scan_scheduler
1292            .open_file(&Path::parse("foo").unwrap(), &CachedFileSize::new(100000))
1293            .await
1294            .unwrap();
1295
1296        let first_fut = file_scheduler.submit_single(0..10, 0);
1297        let second_fut = file_scheduler.submit_single(0..10, 0);
1298
1299        std::thread::sleep(Duration::from_millis(100));
1300        assert_eq!(first_fut.await.unwrap().len(), 10);
1301        assert_eq!(second_fut.await.unwrap().len(), 10);
1302    }
1303
1304    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1305    async fn stress_backpressure() {
1306        // This test ensures that the backpressure mechanism works correctly with
1307        // regards to priority.  In other words, as long as all requests are consumed
1308        // in priority order then the backpressure mechanism should not deadlock
1309        let some_path = Path::parse("foo").unwrap();
1310        let obj_store = Arc::new(ObjectStore::memory());
1311        obj_store
1312            .put(&some_path, vec![0; 100000].as_slice())
1313            .await
1314            .unwrap();
1315
1316        // Only one request will be allowed in
1317        let config = SchedulerConfig {
1318            io_buffer_size_bytes: 1,
1319        };
1320        let scan_scheduler = ScanScheduler::new(obj_store.clone(), config);
1321        let file_scheduler = scan_scheduler
1322            .open_file(&some_path, &CachedFileSize::unknown())
1323            .await
1324            .unwrap();
1325
1326        let mut futs = Vec::with_capacity(10000);
1327        for idx in 0..10000 {
1328            futs.push(file_scheduler.submit_single(idx..idx + 1, idx));
1329        }
1330
1331        for fut in futs {
1332            fut.await.unwrap();
1333        }
1334    }
1335}