lance_io/
scheduler.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use bytes::Bytes;
5use futures::channel::oneshot;
6use futures::{FutureExt, TryFutureExt};
7use object_store::path::Path;
8use snafu::location;
9use std::collections::BinaryHeap;
10use std::fmt::Debug;
11use std::future::Future;
12use std::ops::Range;
13use std::sync::atomic::{AtomicU64, Ordering};
14use std::sync::{Arc, Mutex};
15use std::time::Instant;
16use tokio::sync::{Notify, Semaphore, SemaphorePermit};
17
18use lance_core::{Error, Result};
19
20use crate::object_store::ObjectStore;
21use crate::traits::Reader;
22
23// Don't log backpressure warnings until at least this many seconds have passed
24const BACKPRESSURE_MIN: u64 = 5;
25// Don't log backpressure warnings more than once / minute
26const BACKPRESSURE_DEBOUNCE: u64 = 60;
27
28// Global counter of how many IOPS we have issued
29static IOPS_COUNTER: AtomicU64 = AtomicU64::new(0);
30// Global counter of how many bytes were read by the scheduler
31static BYTES_READ_COUNTER: AtomicU64 = AtomicU64::new(0);
32
33pub fn iops_counter() -> u64 {
34    IOPS_COUNTER.load(Ordering::Acquire)
35}
36
37pub fn bytes_read_counter() -> u64 {
38    BYTES_READ_COUNTER.load(Ordering::Acquire)
39}
40
41// There are two structures that control the I/O scheduler concurrency.  First,
42// we have a hard limit on the number of IOPS that can be issued concurrently.
43// This limit is process-wide.
44//
45// Second, we try and limit how many I/O requests can be buffered in memory without
46// being consumed by a decoder of some kind.  This limit is per-scheduler.  We cannot
47// make this limit process wide without introducing deadlock (because the decoder for
48// file 0 might be waiting on IOPS blocked by a queue filled with requests for file 1)
49// and vice-versa.
50//
51// There is also a per-scan limit on the number of IOPS that can be issued concurrently.
52//
53// The process-wide limit exists when users need a hard limit on the number of parallel
54// IOPS, e.g. due to port availability limits or to prevent multiple scans from saturating
55// the network.  (Note: a process-wide limit of X will not necessarily limit the number of
56// open TCP connections to exactly X.  The underlying object store may open more connections
57// anyways)
58//
59// However, it can be too tough in some cases, e.g. when some scans are reading from
60// cloud storage and other scans are reading from local disk.  In these cases users don't
61// need to set a process-limit and can rely on the per-scan limits.
62
63// The IopsQuota enforces the first of the above limits, it is the per-process hard cap
64// on the number of IOPS that can be issued concurrently.
65//
66// The per-scan limits are enforced by IoQueue
67struct IopsQuota {
68    // An Option is used here to avoid mutex overhead if no limit is set
69    iops_avail: Option<Semaphore>,
70}
71
72/// A reservation on the global IOPS quota
73///
74/// When the reservation is dropped, the IOPS quota is released unless
75/// [`Self::forget`] is called.
76struct IopsReservation<'a> {
77    value: Option<SemaphorePermit<'a>>,
78}
79
80impl IopsReservation<'_> {
81    // Forget the reservation, so it won't be released on drop
82    fn forget(&mut self) {
83        if let Some(value) = self.value.take() {
84            value.forget();
85        }
86    }
87}
88
89impl IopsQuota {
90    // By default, there is no process-wide limit on IOPS
91    //
92    // However, the user can request one by setting the environment variable
93    // LANCE_PROCESS_IO_THREADS_LIMIT to a positive integer.
94    fn new() -> Self {
95        let initial_capacity = std::env::var("LANCE_PROCESS_IO_THREADS_LIMIT")
96            .map(|s| {
97                let limit = s
98                    .parse::<i32>()
99                    .expect("LANCE_PROCESS_IO_THREADS_LIMIT must be a positive integer");
100                if limit <= 0 {
101                    panic!("LANCE_PROCESS_IO_THREADS_LIMIT must be a positive integer.  To disable the limit, unset the environment variable");
102                }
103                limit
104            })
105            // The default (-1) does not apply any limit
106            .unwrap_or(-1);
107        let iops_avail = if initial_capacity < 0 {
108            None
109        } else {
110            Some(Semaphore::new(initial_capacity as usize))
111        };
112        Self { iops_avail }
113    }
114
115    // Return a reservation on the global IOPS quota
116    fn release(&self) {
117        if let Some(iops_avail) = self.iops_avail.as_ref() {
118            iops_avail.add_permits(1);
119        }
120    }
121
122    // Acquire a reservation on the global IOPS quota
123    async fn acquire(&self) -> IopsReservation {
124        if let Some(iops_avail) = self.iops_avail.as_ref() {
125            IopsReservation {
126                value: Some(iops_avail.acquire().await.unwrap()),
127            }
128        } else {
129            IopsReservation { value: None }
130        }
131    }
132}
133
134lazy_static::lazy_static! {
135    static ref IOPS_QUOTA: IopsQuota = IopsQuota::new();
136}
137
138// We want to allow requests that have a lower priority than any
139// currently in-flight request.  This helps avoid potential deadlocks
140// related to backpressure.  Unfortunately, it is quite expensive to
141// keep track of which priorities are in-flight.
142//
143// TODO: At some point it would be nice if we can optimize this away but
144// in_flight should remain relatively small (generally less than 256 items)
145// and has not shown itself to be a bottleneck yet.
146struct PrioritiesInFlight {
147    in_flight: Vec<u128>,
148}
149
150impl PrioritiesInFlight {
151    fn new(capacity: u32) -> Self {
152        Self {
153            in_flight: Vec::with_capacity(capacity as usize * 2),
154        }
155    }
156
157    fn min_in_flight(&self) -> u128 {
158        self.in_flight.first().copied().unwrap_or(u128::MAX)
159    }
160
161    fn push(&mut self, prio: u128) {
162        let pos = match self.in_flight.binary_search(&prio) {
163            Ok(pos) => pos,
164            Err(pos) => pos,
165        };
166        self.in_flight.insert(pos, prio);
167    }
168
169    fn remove(&mut self, prio: u128) {
170        if let Ok(pos) = self.in_flight.binary_search(&prio) {
171            self.in_flight.remove(pos);
172        } else {
173            unreachable!();
174        }
175    }
176}
177
178struct IoQueueState {
179    // Number of IOPS we can issue concurrently before pausing I/O
180    iops_avail: u32,
181    // Number of bytes we are allowed to buffer in memory before pausing I/O
182    //
183    // This can dip below 0 due to I/O prioritization
184    bytes_avail: i64,
185    // Pending I/O requests
186    pending_requests: BinaryHeap<IoTask>,
187    // Priorities of in-flight requests
188    priorities_in_flight: PrioritiesInFlight,
189    // Set when the scheduler is finished to notify the I/O loop to shut down
190    // once all outstanding requests have been completed.
191    done_scheduling: bool,
192    // Time when the scheduler started
193    start: Instant,
194    // Last time we warned about backpressure
195    last_warn: AtomicU64,
196}
197
198impl IoQueueState {
199    fn new(io_capacity: u32, io_buffer_size: u64) -> Self {
200        Self {
201            iops_avail: io_capacity,
202            bytes_avail: io_buffer_size as i64,
203            pending_requests: BinaryHeap::new(),
204            priorities_in_flight: PrioritiesInFlight::new(io_capacity),
205            done_scheduling: false,
206            start: Instant::now(),
207            last_warn: AtomicU64::from(0),
208        }
209    }
210
211    fn finished(&self) -> bool {
212        self.done_scheduling && self.pending_requests.is_empty()
213    }
214
215    fn warn_if_needed(&self) {
216        let seconds_elapsed = self.start.elapsed().as_secs();
217        let last_warn = self.last_warn.load(Ordering::Acquire);
218        let since_last_warn = seconds_elapsed - last_warn;
219        if (last_warn == 0
220            && seconds_elapsed > BACKPRESSURE_MIN
221            && seconds_elapsed < BACKPRESSURE_DEBOUNCE)
222            || since_last_warn > BACKPRESSURE_DEBOUNCE
223        {
224            tracing::event!(tracing::Level::DEBUG, "Backpressure throttle exceeded");
225            log::debug!("Backpressure throttle is full, I/O will pause until buffer is drained.  Max I/O bandwidth will not be achieved because CPU is falling behind");
226            self.last_warn
227                .store(seconds_elapsed.max(1), Ordering::Release);
228        }
229    }
230
231    fn can_deliver(&self, task: &IoTask) -> bool {
232        if self.iops_avail == 0 {
233            false
234        } else if task.priority <= self.priorities_in_flight.min_in_flight() {
235            true
236        } else if task.num_bytes() as i64 > self.bytes_avail {
237            self.warn_if_needed();
238            false
239        } else {
240            true
241        }
242    }
243
244    fn next_task(&mut self) -> Option<IoTask> {
245        let task = self.pending_requests.peek()?;
246        if self.can_deliver(task) {
247            self.priorities_in_flight.push(task.priority);
248            self.iops_avail -= 1;
249            self.bytes_avail -= task.num_bytes() as i64;
250            if self.bytes_avail < 0 {
251                // This can happen when we admit special priority requests
252                log::debug!(
253                    "Backpressure throttle temporarily exceeded by {} bytes due to priority I/O",
254                    -self.bytes_avail
255                );
256            }
257            Some(self.pending_requests.pop().unwrap())
258        } else {
259            None
260        }
261    }
262}
263
264// This is modeled after the MPSC queue described here: https://docs.rs/tokio/latest/tokio/sync/struct.Notify.html
265//
266// However, it only needs to be SPSC since there is only one "scheduler thread"
267// and one I/O loop.
268struct IoQueue {
269    // Queue state
270    state: Mutex<IoQueueState>,
271    // Used to signal new I/O requests have arrived that might potentially be runnable
272    notify: Notify,
273}
274
275impl IoQueue {
276    fn new(io_capacity: u32, io_buffer_size: u64) -> Self {
277        Self {
278            state: Mutex::new(IoQueueState::new(io_capacity, io_buffer_size)),
279            notify: Notify::new(),
280        }
281    }
282
283    fn push(&self, task: IoTask) {
284        log::trace!(
285            "Inserting I/O request for {} bytes with priority ({},{}) into I/O queue",
286            task.num_bytes(),
287            task.priority >> 64,
288            task.priority & 0xFFFFFFFFFFFFFFFF
289        );
290        let mut state = self.state.lock().unwrap();
291        state.pending_requests.push(task);
292        drop(state);
293
294        self.notify.notify_one();
295    }
296
297    async fn pop(&self) -> Option<IoTask> {
298        loop {
299            {
300                // First, grab a reservation on the global IOPS quota
301                // If we then get a task to run, transfer the reservation
302                // to the task.  Otherwise, the reservation will be released
303                // when iop_res is dropped.
304                let mut iop_res = IOPS_QUOTA.acquire().await;
305                // Next, try and grab a reservation from the queue
306                let mut state = self.state.lock().unwrap();
307                if let Some(task) = state.next_task() {
308                    // Reservation successfully acquired, we will release the global
309                    // global reservation after task has run.
310                    iop_res.forget();
311                    return Some(task);
312                }
313
314                if state.finished() {
315                    return None;
316                }
317            }
318
319            self.notify.notified().await;
320        }
321    }
322
323    fn on_iop_complete(&self) {
324        let mut state = self.state.lock().unwrap();
325        state.iops_avail += 1;
326        drop(state);
327
328        self.notify.notify_one();
329    }
330
331    fn on_bytes_consumed(&self, bytes: u64, priority: u128, num_reqs: usize) {
332        let mut state = self.state.lock().unwrap();
333        state.bytes_avail += bytes as i64;
334        for _ in 0..num_reqs {
335            state.priorities_in_flight.remove(priority);
336        }
337        drop(state);
338
339        self.notify.notify_one();
340    }
341
342    fn close(&self) {
343        let mut state = self.state.lock().unwrap();
344        state.done_scheduling = true;
345        drop(state);
346
347        self.notify.notify_one();
348    }
349}
350
351// There is one instance of MutableBatch shared by all the I/O operations
352// that make up a single request.  When all the I/O operations complete
353// then the MutableBatch goes out of scope and the batch request is considered
354// complete
355struct MutableBatch<F: FnOnce(Response) + Send> {
356    when_done: Option<F>,
357    data_buffers: Vec<Bytes>,
358    num_bytes: u64,
359    priority: u128,
360    num_reqs: usize,
361    err: Option<Box<dyn std::error::Error + Send + Sync + 'static>>,
362}
363
364impl<F: FnOnce(Response) + Send> MutableBatch<F> {
365    fn new(when_done: F, num_data_buffers: u32, priority: u128, num_reqs: usize) -> Self {
366        Self {
367            when_done: Some(when_done),
368            data_buffers: vec![Bytes::default(); num_data_buffers as usize],
369            num_bytes: 0,
370            priority,
371            num_reqs,
372            err: None,
373        }
374    }
375}
376
377// Rather than keep track of when all the I/O requests are finished so that we
378// can deliver the batch of data we let Rust do that for us.  When all I/O's are
379// done then the MutableBatch will go out of scope and we know we have all the
380// data.
381impl<F: FnOnce(Response) + Send> Drop for MutableBatch<F> {
382    fn drop(&mut self) {
383        // If we have an error, return that.  Otherwise return the data
384        let result = if self.err.is_some() {
385            Err(Error::Wrapped {
386                error: self.err.take().unwrap(),
387                location: location!(),
388            })
389        } else {
390            let mut data = Vec::new();
391            std::mem::swap(&mut data, &mut self.data_buffers);
392            Ok(data)
393        };
394        // We don't really care if no one is around to receive it, just let
395        // the result go out of scope and get cleaned up
396        let response = Response {
397            data: result,
398            num_bytes: self.num_bytes,
399            priority: self.priority,
400            num_reqs: self.num_reqs,
401        };
402        (self.when_done.take().unwrap())(response);
403    }
404}
405
406struct DataChunk {
407    task_idx: usize,
408    num_bytes: u64,
409    data: Result<Bytes>,
410}
411
412trait DataSink: Send {
413    fn deliver_data(&mut self, data: DataChunk);
414}
415
416impl<F: FnOnce(Response) + Send> DataSink for MutableBatch<F> {
417    // Called by worker tasks to add data to the MutableBatch
418    fn deliver_data(&mut self, data: DataChunk) {
419        self.num_bytes += data.num_bytes;
420        match data.data {
421            Ok(data_bytes) => {
422                self.data_buffers[data.task_idx] = data_bytes;
423            }
424            Err(err) => {
425                // This keeps the original error, if present
426                self.err.get_or_insert(Box::new(err));
427            }
428        }
429    }
430}
431
432struct IoTask {
433    reader: Arc<dyn Reader>,
434    to_read: Range<u64>,
435    when_done: Box<dyn FnOnce(Result<Bytes>) + Send>,
436    priority: u128,
437}
438
439impl Eq for IoTask {}
440
441impl PartialEq for IoTask {
442    fn eq(&self, other: &Self) -> bool {
443        self.priority == other.priority
444    }
445}
446
447impl PartialOrd for IoTask {
448    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
449        Some(self.cmp(other))
450    }
451}
452
453impl Ord for IoTask {
454    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
455        // This is intentionally inverted.  We want a min-heap
456        other.priority.cmp(&self.priority)
457    }
458}
459
460impl IoTask {
461    fn num_bytes(&self) -> u64 {
462        self.to_read.end - self.to_read.start
463    }
464
465    async fn run(self) {
466        let bytes = if self.to_read.start == self.to_read.end {
467            Ok(Bytes::new())
468        } else {
469            let bytes_fut = self
470                .reader
471                .get_range(self.to_read.start as usize..self.to_read.end as usize);
472            IOPS_COUNTER.fetch_add(1, Ordering::Release);
473            BYTES_READ_COUNTER.fetch_add(self.num_bytes(), Ordering::Release);
474            bytes_fut.await.map_err(Error::from)
475        };
476        IOPS_QUOTA.release();
477        (self.when_done)(bytes);
478    }
479}
480
481// Every time a scheduler starts up it launches a task to run the I/O loop.  This loop
482// repeats endlessly until the scheduler is destroyed.
483async fn run_io_loop(tasks: Arc<IoQueue>) {
484    // Pop the first finished task off the queue and submit another until
485    // we are done
486    loop {
487        let next_task = tasks.pop().await;
488        match next_task {
489            Some(task) => {
490                tokio::spawn(task.run());
491            }
492            None => {
493                // The sender has been dropped, we are done
494                return;
495            }
496        }
497    }
498}
499
500#[derive(Debug)]
501struct StatsCollector {
502    iops: AtomicU64,
503    requests: AtomicU64,
504    bytes_read: AtomicU64,
505}
506
507impl StatsCollector {
508    fn new() -> Self {
509        Self {
510            iops: AtomicU64::new(0),
511            requests: AtomicU64::new(0),
512            bytes_read: AtomicU64::new(0),
513        }
514    }
515
516    fn iops(&self) -> u64 {
517        self.iops.load(Ordering::Relaxed)
518    }
519
520    fn bytes_read(&self) -> u64 {
521        self.bytes_read.load(Ordering::Relaxed)
522    }
523
524    fn requests(&self) -> u64 {
525        self.requests.load(Ordering::Relaxed)
526    }
527
528    fn record_request(&self, request: &[Range<u64>]) {
529        self.requests.fetch_add(1, Ordering::Relaxed);
530        self.iops.fetch_add(request.len() as u64, Ordering::Relaxed);
531        self.bytes_read.fetch_add(
532            request.iter().map(|r| r.end - r.start).sum::<u64>(),
533            Ordering::Relaxed,
534        );
535    }
536}
537
538pub struct ScanStats {
539    pub iops: u64,
540    pub requests: u64,
541    pub bytes_read: u64,
542}
543
544impl ScanStats {
545    fn new(stats: &StatsCollector) -> Self {
546        Self {
547            iops: stats.iops(),
548            requests: stats.requests(),
549            bytes_read: stats.bytes_read(),
550        }
551    }
552}
553
554/// An I/O scheduler which wraps an ObjectStore and throttles the amount of
555/// parallel I/O that can be run.
556///
557/// TODO: This will also add coalescing
558pub struct ScanScheduler {
559    object_store: Arc<ObjectStore>,
560    io_queue: Arc<IoQueue>,
561    stats: Arc<StatsCollector>,
562}
563
564impl Debug for ScanScheduler {
565    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
566        f.debug_struct("ScanScheduler")
567            .field("object_store", &self.object_store)
568            .finish()
569    }
570}
571
572struct Response {
573    data: Result<Vec<Bytes>>,
574    priority: u128,
575    num_reqs: usize,
576    num_bytes: u64,
577}
578
579#[derive(Debug, Clone, Copy)]
580pub struct SchedulerConfig {
581    /// the # of bytes that can be buffered but not yet requested.
582    /// This controls back pressure.  If data is not processed quickly enough then this
583    /// buffer will fill up and the I/O loop will pause until the buffer is drained.
584    pub io_buffer_size_bytes: u64,
585}
586
587impl SchedulerConfig {
588    /// Big enough for unit testing
589    pub fn default_for_testing() -> Self {
590        Self {
591            io_buffer_size_bytes: 256 * 1024 * 1024,
592        }
593    }
594
595    /// Configuration that should generally maximize bandwidth (not trying to save RAM
596    /// at all).  We assume a max page size of 32MiB and then allow 32MiB per I/O thread
597    pub fn max_bandwidth(store: &ObjectStore) -> Self {
598        Self {
599            io_buffer_size_bytes: 32 * 1024 * 1024 * store.io_parallelism() as u64,
600        }
601    }
602}
603
604impl ScanScheduler {
605    /// Create a new scheduler with the given I/O capacity
606    ///
607    /// # Arguments
608    ///
609    /// * object_store - the store to wrap
610    /// * config - configuration settings for the scheduler
611    pub fn new(object_store: Arc<ObjectStore>, config: SchedulerConfig) -> Arc<Self> {
612        let io_capacity = object_store.io_parallelism();
613        let io_queue = Arc::new(IoQueue::new(
614            io_capacity as u32,
615            config.io_buffer_size_bytes,
616        ));
617        let scheduler = Self {
618            object_store,
619            io_queue: io_queue.clone(),
620            stats: Arc::new(StatsCollector::new()),
621        };
622        tokio::task::spawn(async move { run_io_loop(io_queue).await });
623        Arc::new(scheduler)
624    }
625
626    /// Open a file for reading
627    ///
628    /// # Arguments
629    ///
630    /// * path - the path to the file to open
631    /// * base_priority - the base priority for I/O requests submitted to this file scheduler
632    ///   this will determine the upper 64 bits of priority (the lower 64 bits
633    ///   come from `submit_request` and `submit_single`)
634    pub async fn open_file_with_priority(
635        self: &Arc<Self>,
636        path: &Path,
637        base_priority: u64,
638    ) -> Result<FileScheduler> {
639        let reader = self.object_store.open(path).await?;
640        let block_size = self.object_store.block_size() as u64;
641        Ok(FileScheduler {
642            reader: reader.into(),
643            block_size,
644            root: self.clone(),
645            base_priority,
646        })
647    }
648
649    /// Open a file with a default priority of 0
650    ///
651    /// See [`Self::open_file_with_priority`] for more information on the priority
652    pub async fn open_file(self: &Arc<Self>, path: &Path) -> Result<FileScheduler> {
653        self.open_file_with_priority(path, 0).await
654    }
655
656    fn do_submit_request(
657        &self,
658        reader: Arc<dyn Reader>,
659        request: Vec<Range<u64>>,
660        tx: oneshot::Sender<Response>,
661        priority: u128,
662    ) {
663        let num_iops = request.len() as u32;
664
665        let when_all_io_done = move |bytes_and_permits| {
666            // We don't care if the receiver has given up so discard the result
667            let _ = tx.send(bytes_and_permits);
668        };
669
670        let dest = Arc::new(Mutex::new(Box::new(MutableBatch::new(
671            when_all_io_done,
672            num_iops,
673            priority,
674            request.len(),
675        ))));
676
677        for (task_idx, iop) in request.into_iter().enumerate() {
678            let dest = dest.clone();
679            let io_queue = self.io_queue.clone();
680            let num_bytes = iop.end - iop.start;
681            let task = IoTask {
682                reader: reader.clone(),
683                to_read: iop,
684                priority,
685                when_done: Box::new(move |data| {
686                    io_queue.on_iop_complete();
687                    let mut dest = dest.lock().unwrap();
688                    let chunk = DataChunk {
689                        data,
690                        task_idx,
691                        num_bytes,
692                    };
693                    dest.deliver_data(chunk);
694                }),
695            };
696            self.io_queue.push(task);
697        }
698    }
699
700    fn submit_request(
701        &self,
702        reader: Arc<dyn Reader>,
703        request: Vec<Range<u64>>,
704        priority: u128,
705    ) -> impl Future<Output = Result<Vec<Bytes>>> + Send {
706        let (tx, rx) = oneshot::channel::<Response>();
707
708        self.do_submit_request(reader, request, tx, priority);
709
710        let io_queue = self.io_queue.clone();
711
712        rx.map(move |wrapped_rsp| {
713            // Right now, it isn't possible for I/O to be cancelled so a cancel error should
714            // not occur
715            let rsp = wrapped_rsp.unwrap();
716            io_queue.on_bytes_consumed(rsp.num_bytes, rsp.priority, rsp.num_reqs);
717            rsp.data
718        })
719    }
720
721    pub fn stats(&self) -> ScanStats {
722        ScanStats::new(self.stats.as_ref())
723    }
724}
725
726impl Drop for ScanScheduler {
727    fn drop(&mut self) {
728        self.io_queue.close();
729    }
730}
731
732/// A throttled file reader
733#[derive(Clone, Debug)]
734pub struct FileScheduler {
735    reader: Arc<dyn Reader>,
736    root: Arc<ScanScheduler>,
737    block_size: u64,
738    base_priority: u64,
739}
740
741fn is_close_together(range1: &Range<u64>, range2: &Range<u64>, block_size: u64) -> bool {
742    // Note that range1.end <= range2.start is possible (e.g. when decoding string arrays)
743    range2.start <= (range1.end + block_size)
744}
745
746fn is_overlapping(range1: &Range<u64>, range2: &Range<u64>) -> bool {
747    range1.start < range2.end && range2.start < range1.end
748}
749
750impl FileScheduler {
751    /// Submit a batch of I/O requests to the reader
752    ///
753    /// The requests will be queued in a FIFO manner and, when all requests
754    /// have been fulfilled, the returned future will be completed.
755    ///
756    /// Each request has a given priority.  If the I/O loop is full then requests
757    /// will be buffered and requests with the *lowest* priority will be released
758    /// from the buffer first.
759    ///
760    /// Each request has a backpressure ID which controls which backpressure throttle
761    /// is applied to the request.  Requests made to the same backpressure throttle
762    /// will be throttled together.
763    pub fn submit_request(
764        &self,
765        request: Vec<Range<u64>>,
766        priority: u64,
767    ) -> impl Future<Output = Result<Vec<Bytes>>> + Send {
768        self.root.stats.record_request(&request);
769
770        // The final priority is a combination of the row offset and the file number
771        let priority = ((self.base_priority as u128) << 64) + priority as u128;
772
773        let mut updated_requests = Vec::with_capacity(request.len());
774
775        if !request.is_empty() {
776            let mut curr_interval = request[0].clone();
777
778            for req in request.iter().skip(1) {
779                if is_close_together(&curr_interval, req, self.block_size) {
780                    curr_interval.end = curr_interval.end.max(req.end);
781                } else {
782                    updated_requests.push(curr_interval);
783                    curr_interval = req.clone();
784                }
785            }
786
787            updated_requests.push(curr_interval);
788        }
789
790        let bytes_vec_fut =
791            self.root
792                .submit_request(self.reader.clone(), updated_requests.clone(), priority);
793
794        let mut updated_index = 0;
795        let mut final_bytes = Vec::with_capacity(request.len());
796
797        async move {
798            let bytes_vec = bytes_vec_fut.await?;
799
800            let mut orig_index = 0;
801            while (updated_index < updated_requests.len()) && (orig_index < request.len()) {
802                let updated_range = &updated_requests[updated_index];
803                let orig_range = &request[orig_index];
804                let byte_offset = updated_range.start as usize;
805
806                if is_overlapping(updated_range, orig_range) {
807                    // Rescale the ranges since they correspond to the entire set of bytes, while
808                    // But we need to slice into a subset of the bytes in a particular index of bytes_vec
809                    let start = orig_range.start as usize - byte_offset;
810                    let end = orig_range.end as usize - byte_offset;
811
812                    let sliced_range = bytes_vec[updated_index].slice(start..end);
813                    final_bytes.push(sliced_range);
814                    orig_index += 1;
815                } else {
816                    updated_index += 1;
817                }
818            }
819
820            Ok(final_bytes)
821        }
822    }
823
824    pub fn with_priority(&self, priority: u64) -> Self {
825        Self {
826            reader: self.reader.clone(),
827            root: self.root.clone(),
828            block_size: self.block_size,
829            base_priority: priority,
830        }
831    }
832
833    /// Submit a single IOP to the reader
834    ///
835    /// If you have multiple IOPS to perform then [`Self::submit_request`] is going
836    /// to be more efficient.
837    ///
838    /// See [`Self::submit_request`] for more information on the priority and backpressure.
839    pub fn submit_single(
840        &self,
841        range: Range<u64>,
842        priority: u64,
843    ) -> impl Future<Output = Result<Bytes>> + Send {
844        self.submit_request(vec![range], priority)
845            .map_ok(|vec_bytes| vec_bytes.into_iter().next().unwrap())
846    }
847
848    /// Provides access to the underlying reader
849    ///
850    /// Do not use this for reading data as it will bypass any I/O scheduling!
851    /// This is mainly exposed to allow metadata operations (e.g size, block_size,)
852    /// which either aren't IOPS or we don't throttle
853    pub fn reader(&self) -> &Arc<dyn Reader> {
854        &self.reader
855    }
856}
857
858#[cfg(test)]
859mod tests {
860    use std::{collections::VecDeque, time::Duration};
861
862    use futures::poll;
863    use rand::RngCore;
864    use tempfile::tempdir;
865
866    use object_store::{memory::InMemory, GetRange, ObjectStore as OSObjectStore};
867    use tokio::{runtime::Handle, time::timeout};
868    use url::Url;
869
870    use crate::{object_store::DEFAULT_DOWNLOAD_RETRY_COUNT, testing::MockObjectStore};
871
872    use super::*;
873
874    #[tokio::test]
875    async fn test_full_seq_read() {
876        let tmpdir = tempdir().unwrap();
877        let tmp_path = tmpdir.path().to_str().unwrap();
878        let tmp_path = Path::parse(tmp_path).unwrap();
879        let tmp_file = tmp_path.child("foo.file");
880
881        let obj_store = Arc::new(ObjectStore::local());
882
883        // Write 1MiB of data
884        const DATA_SIZE: u64 = 1024 * 1024;
885        let mut some_data = vec![0; DATA_SIZE as usize];
886        rand::thread_rng().fill_bytes(&mut some_data);
887        obj_store.put(&tmp_file, &some_data).await.unwrap();
888
889        let config = SchedulerConfig::default_for_testing();
890
891        let scheduler = ScanScheduler::new(obj_store, config);
892
893        let file_scheduler = scheduler.open_file(&tmp_file).await.unwrap();
894
895        // Read it back 4KiB at a time
896        const READ_SIZE: u64 = 4 * 1024;
897        let mut reqs = VecDeque::new();
898        let mut offset = 0;
899        while offset < DATA_SIZE {
900            reqs.push_back(
901                #[allow(clippy::single_range_in_vec_init)]
902                file_scheduler
903                    .submit_request(vec![offset..offset + READ_SIZE], 0)
904                    .await
905                    .unwrap(),
906            );
907            offset += READ_SIZE;
908        }
909
910        offset = 0;
911        // Note: we should get parallel I/O even though we are consuming serially
912        while offset < DATA_SIZE {
913            let data = reqs.pop_front().unwrap();
914            let actual = &data[0];
915            let expected = &some_data[offset as usize..(offset + READ_SIZE) as usize];
916            assert_eq!(expected, actual);
917            offset += READ_SIZE;
918        }
919    }
920
921    #[tokio::test]
922    async fn test_priority() {
923        let some_path = Path::parse("foo").unwrap();
924        let base_store = Arc::new(InMemory::new());
925        base_store
926            .put(&some_path, vec![0; 1000].into())
927            .await
928            .unwrap();
929
930        let semaphore = Arc::new(tokio::sync::Semaphore::new(0));
931        let mut obj_store = MockObjectStore::default();
932        let semaphore_copy = semaphore.clone();
933        obj_store
934            .expect_get_opts()
935            .returning(move |location, options| {
936                let semaphore = semaphore.clone();
937                let base_store = base_store.clone();
938                let location = location.clone();
939                async move {
940                    semaphore.acquire().await.unwrap().forget();
941                    base_store.get_opts(&location, options).await
942                }
943                .boxed()
944            });
945        let obj_store = Arc::new(ObjectStore::new(
946            Arc::new(obj_store),
947            Url::parse("mem://").unwrap(),
948            None,
949            None,
950            false,
951            false,
952            1,
953            DEFAULT_DOWNLOAD_RETRY_COUNT,
954        ));
955
956        let config = SchedulerConfig {
957            io_buffer_size_bytes: 1024 * 1024,
958        };
959
960        let scan_scheduler = ScanScheduler::new(obj_store, config);
961
962        let file_scheduler = scan_scheduler
963            .open_file(&Path::parse("foo").unwrap())
964            .await
965            .unwrap();
966
967        // Issue a request, priority doesn't matter, it will be submitted
968        // immediately (it will go pending)
969        // Note: the timeout is to prevent a deadlock if the test fails.
970        let first_fut = timeout(
971            Duration::from_secs(10),
972            file_scheduler.submit_single(0..10, 0),
973        )
974        .boxed();
975
976        // Issue another low priority request (it will go in queue)
977        let mut second_fut = timeout(
978            Duration::from_secs(10),
979            file_scheduler.submit_single(0..20, 100),
980        )
981        .boxed();
982
983        // Issue a high priority request (it will go in queue and should bump
984        // the other queued request down)
985        let mut third_fut = timeout(
986            Duration::from_secs(10),
987            file_scheduler.submit_single(0..30, 0),
988        )
989        .boxed();
990
991        // Finish one file, should be the in-flight first request
992        semaphore_copy.add_permits(1);
993        assert!(first_fut.await.unwrap().unwrap().len() == 10);
994        // Other requests should not be finished
995        assert!(poll!(&mut second_fut).is_pending());
996        assert!(poll!(&mut third_fut).is_pending());
997
998        // Next should be high priority request
999        semaphore_copy.add_permits(1);
1000        assert!(third_fut.await.unwrap().unwrap().len() == 30);
1001        assert!(poll!(&mut second_fut).is_pending());
1002
1003        // Finally, the low priority request
1004        semaphore_copy.add_permits(1);
1005        assert!(second_fut.await.unwrap().unwrap().len() == 20);
1006    }
1007
1008    #[tokio::test(flavor = "multi_thread")]
1009    async fn test_backpressure() {
1010        let some_path = Path::parse("foo").unwrap();
1011        let base_store = Arc::new(InMemory::new());
1012        base_store
1013            .put(&some_path, vec![0; 100000].into())
1014            .await
1015            .unwrap();
1016
1017        let bytes_read = Arc::new(AtomicU64::from(0));
1018        let mut obj_store = MockObjectStore::default();
1019        let bytes_read_copy = bytes_read.clone();
1020        // Wraps the obj_store to keep track of how many bytes have been read
1021        obj_store
1022            .expect_get_opts()
1023            .returning(move |location, options| {
1024                let range = options.range.as_ref().unwrap();
1025                let num_bytes = match range {
1026                    GetRange::Bounded(bounded) => bounded.end - bounded.start,
1027                    _ => panic!(),
1028                };
1029                bytes_read_copy.fetch_add(num_bytes as u64, Ordering::Release);
1030                let location = location.clone();
1031                let base_store = base_store.clone();
1032                async move { base_store.get_opts(&location, options).await }.boxed()
1033            });
1034        let obj_store = Arc::new(ObjectStore::new(
1035            Arc::new(obj_store),
1036            Url::parse("mem://").unwrap(),
1037            None,
1038            None,
1039            false,
1040            false,
1041            1,
1042            DEFAULT_DOWNLOAD_RETRY_COUNT,
1043        ));
1044
1045        let config = SchedulerConfig {
1046            io_buffer_size_bytes: 10,
1047        };
1048
1049        let scan_scheduler = ScanScheduler::new(obj_store.clone(), config);
1050
1051        let file_scheduler = scan_scheduler
1052            .open_file(&Path::parse("foo").unwrap())
1053            .await
1054            .unwrap();
1055
1056        let wait_for_idle = || async move {
1057            let handle = Handle::current();
1058            while handle.metrics().num_alive_tasks() != 1 {
1059                tokio::time::sleep(Duration::from_millis(10)).await;
1060            }
1061        };
1062        let wait_for_bytes_read_and_idle = |target_bytes: u64| {
1063            // We need to move `target` but don't want to move `bytes_read`
1064            let bytes_read = &bytes_read;
1065            async move {
1066                let bytes_read_copy = bytes_read.clone();
1067                while bytes_read_copy.load(Ordering::Acquire) < target_bytes {
1068                    tokio::time::sleep(Duration::from_millis(10)).await;
1069                }
1070                wait_for_idle().await;
1071            }
1072        };
1073
1074        // This read will begin immediately
1075        let first_fut = file_scheduler.submit_single(0..5, 0);
1076        // This read should also begin immediately
1077        let second_fut = file_scheduler.submit_single(0..5, 0);
1078        // This read will be throttled
1079        let third_fut = file_scheduler.submit_single(0..3, 0);
1080        // Two tasks (third_fut and unit test)
1081        wait_for_bytes_read_and_idle(10).await;
1082
1083        assert_eq!(first_fut.await.unwrap().len(), 5);
1084        // One task (unit test)
1085        wait_for_bytes_read_and_idle(13).await;
1086
1087        // 2 bytes are ready but 5 bytes requested, read will be blocked
1088        let fourth_fut = file_scheduler.submit_single(0..5, 0);
1089        wait_for_bytes_read_and_idle(13).await;
1090
1091        // Out of order completion is ok, will unblock backpressure
1092        assert_eq!(third_fut.await.unwrap().len(), 3);
1093        wait_for_bytes_read_and_idle(18).await;
1094
1095        assert_eq!(second_fut.await.unwrap().len(), 5);
1096        // At this point there are 5 bytes available in backpressure queue
1097        // Now we issue multi-read that can be partially fulfilled, it will read some bytes but
1098        // not all of them. (using large range gap to ensure request not coalesced)
1099        //
1100        // I'm actually not sure this behavior is great.  It's possible that we should just
1101        // block until we can fulfill the entire request.
1102        let fifth_fut = file_scheduler.submit_request(vec![0..3, 90000..90007], 0);
1103        wait_for_bytes_read_and_idle(21).await;
1104
1105        // Fifth future should eventually finish due to deadlock prevention
1106        let fifth_bytes = tokio::time::timeout(Duration::from_secs(10), fifth_fut)
1107            .await
1108            .unwrap();
1109        assert_eq!(
1110            fifth_bytes.unwrap().iter().map(|b| b.len()).sum::<usize>(),
1111            10
1112        );
1113
1114        // And now let's just make sure that we can read the rest of the data
1115        assert_eq!(fourth_fut.await.unwrap().len(), 5);
1116        wait_for_bytes_read_and_idle(28).await;
1117
1118        // Ensure deadlock prevention timeout can be disabled
1119        let config = SchedulerConfig {
1120            io_buffer_size_bytes: 10,
1121        };
1122
1123        let scan_scheduler = ScanScheduler::new(obj_store, config);
1124        let file_scheduler = scan_scheduler
1125            .open_file(&Path::parse("foo").unwrap())
1126            .await
1127            .unwrap();
1128
1129        let first_fut = file_scheduler.submit_single(0..10, 0);
1130        let second_fut = file_scheduler.submit_single(0..10, 0);
1131
1132        std::thread::sleep(Duration::from_millis(100));
1133        assert_eq!(first_fut.await.unwrap().len(), 10);
1134        assert_eq!(second_fut.await.unwrap().len(), 10);
1135    }
1136
1137    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1138    async fn stress_backpressure() {
1139        // This test ensures that the backpressure mechanism works correctly with
1140        // regards to priority.  In other words, as long as all requests are consumed
1141        // in priority order then the backpressure mechanism should not deadlock
1142        let some_path = Path::parse("foo").unwrap();
1143        let obj_store = Arc::new(ObjectStore::memory());
1144        obj_store
1145            .put(&some_path, vec![0; 100000].as_slice())
1146            .await
1147            .unwrap();
1148
1149        // Only one request will be allowed in
1150        let config = SchedulerConfig {
1151            io_buffer_size_bytes: 1,
1152        };
1153        let scan_scheduler = ScanScheduler::new(obj_store.clone(), config);
1154        let file_scheduler = scan_scheduler.open_file(&some_path).await.unwrap();
1155
1156        let mut futs = Vec::with_capacity(10000);
1157        for idx in 0..10000 {
1158            futs.push(file_scheduler.submit_single(idx..idx + 1, idx));
1159        }
1160
1161        for fut in futs {
1162            fut.await.unwrap();
1163        }
1164    }
1165}