lance_io/
scheduler.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use bytes::Bytes;
5use futures::channel::oneshot;
6use futures::{FutureExt, TryFutureExt};
7use object_store::path::Path;
8use snafu::location;
9use std::collections::BinaryHeap;
10use std::fmt::Debug;
11use std::future::Future;
12use std::num::NonZero;
13use std::ops::Range;
14use std::sync::atomic::{AtomicU64, Ordering};
15use std::sync::{Arc, Mutex};
16use std::time::Instant;
17use tokio::sync::{Notify, Semaphore, SemaphorePermit};
18
19use lance_core::{Error, Result};
20
21use crate::object_store::ObjectStore;
22use crate::traits::Reader;
23use crate::utils::CachedFileSize;
24
25// Don't log backpressure warnings until at least this many seconds have passed
26const BACKPRESSURE_MIN: u64 = 5;
27// Don't log backpressure warnings more than once / minute
28const BACKPRESSURE_DEBOUNCE: u64 = 60;
29
30// Global counter of how many IOPS we have issued
31static IOPS_COUNTER: AtomicU64 = AtomicU64::new(0);
32// Global counter of how many bytes were read by the scheduler
33static BYTES_READ_COUNTER: AtomicU64 = AtomicU64::new(0);
34
35pub fn iops_counter() -> u64 {
36    IOPS_COUNTER.load(Ordering::Acquire)
37}
38
39pub fn bytes_read_counter() -> u64 {
40    BYTES_READ_COUNTER.load(Ordering::Acquire)
41}
42
43// There are two structures that control the I/O scheduler concurrency.  First,
44// we have a hard limit on the number of IOPS that can be issued concurrently.
45// This limit is process-wide.
46//
47// Second, we try and limit how many I/O requests can be buffered in memory without
48// being consumed by a decoder of some kind.  This limit is per-scheduler.  We cannot
49// make this limit process wide without introducing deadlock (because the decoder for
50// file 0 might be waiting on IOPS blocked by a queue filled with requests for file 1)
51// and vice-versa.
52//
53// There is also a per-scan limit on the number of IOPS that can be issued concurrently.
54//
55// The process-wide limit exists when users need a hard limit on the number of parallel
56// IOPS, e.g. due to port availability limits or to prevent multiple scans from saturating
57// the network.  (Note: a process-wide limit of X will not necessarily limit the number of
58// open TCP connections to exactly X.  The underlying object store may open more connections
59// anyways)
60//
61// However, it can be too tough in some cases, e.g. when some scans are reading from
62// cloud storage and other scans are reading from local disk.  In these cases users don't
63// need to set a process-limit and can rely on the per-scan limits.
64
65// The IopsQuota enforces the first of the above limits, it is the per-process hard cap
66// on the number of IOPS that can be issued concurrently.
67//
68// The per-scan limits are enforced by IoQueue
69struct IopsQuota {
70    // An Option is used here to avoid mutex overhead if no limit is set
71    iops_avail: Option<Semaphore>,
72}
73
74/// A reservation on the global IOPS quota
75///
76/// When the reservation is dropped, the IOPS quota is released unless
77/// [`Self::forget`] is called.
78struct IopsReservation<'a> {
79    value: Option<SemaphorePermit<'a>>,
80}
81
82impl IopsReservation<'_> {
83    // Forget the reservation, so it won't be released on drop
84    fn forget(&mut self) {
85        if let Some(value) = self.value.take() {
86            value.forget();
87        }
88    }
89}
90
91impl IopsQuota {
92    // By default, there is no process-wide limit on IOPS
93    //
94    // However, the user can request one by setting the environment variable
95    // LANCE_PROCESS_IO_THREADS_LIMIT to a positive integer.
96    fn new() -> Self {
97        let initial_capacity = std::env::var("LANCE_PROCESS_IO_THREADS_LIMIT")
98            .map(|s| {
99                let limit = s
100                    .parse::<i32>()
101                    .expect("LANCE_PROCESS_IO_THREADS_LIMIT must be a positive integer");
102                if limit <= 0 {
103                    panic!("LANCE_PROCESS_IO_THREADS_LIMIT must be a positive integer.  To disable the limit, unset the environment variable");
104                }
105                limit
106            })
107            // The default (-1) does not apply any limit
108            .unwrap_or(-1);
109        let iops_avail = if initial_capacity < 0 {
110            None
111        } else {
112            Some(Semaphore::new(initial_capacity as usize))
113        };
114        Self { iops_avail }
115    }
116
117    // Return a reservation on the global IOPS quota
118    fn release(&self) {
119        if let Some(iops_avail) = self.iops_avail.as_ref() {
120            iops_avail.add_permits(1);
121        }
122    }
123
124    // Acquire a reservation on the global IOPS quota
125    async fn acquire(&self) -> IopsReservation {
126        if let Some(iops_avail) = self.iops_avail.as_ref() {
127            IopsReservation {
128                value: Some(iops_avail.acquire().await.unwrap()),
129            }
130        } else {
131            IopsReservation { value: None }
132        }
133    }
134}
135
136lazy_static::lazy_static! {
137    static ref IOPS_QUOTA: IopsQuota = IopsQuota::new();
138}
139
140// We want to allow requests that have a lower priority than any
141// currently in-flight request.  This helps avoid potential deadlocks
142// related to backpressure.  Unfortunately, it is quite expensive to
143// keep track of which priorities are in-flight.
144//
145// TODO: At some point it would be nice if we can optimize this away but
146// in_flight should remain relatively small (generally less than 256 items)
147// and has not shown itself to be a bottleneck yet.
148struct PrioritiesInFlight {
149    in_flight: Vec<u128>,
150}
151
152impl PrioritiesInFlight {
153    fn new(capacity: u32) -> Self {
154        Self {
155            in_flight: Vec::with_capacity(capacity as usize * 2),
156        }
157    }
158
159    fn min_in_flight(&self) -> u128 {
160        self.in_flight.first().copied().unwrap_or(u128::MAX)
161    }
162
163    fn push(&mut self, prio: u128) {
164        let pos = match self.in_flight.binary_search(&prio) {
165            Ok(pos) => pos,
166            Err(pos) => pos,
167        };
168        self.in_flight.insert(pos, prio);
169    }
170
171    fn remove(&mut self, prio: u128) {
172        if let Ok(pos) = self.in_flight.binary_search(&prio) {
173            self.in_flight.remove(pos);
174        } else {
175            unreachable!();
176        }
177    }
178}
179
180struct IoQueueState {
181    // Number of IOPS we can issue concurrently before pausing I/O
182    iops_avail: u32,
183    // Number of bytes we are allowed to buffer in memory before pausing I/O
184    //
185    // This can dip below 0 due to I/O prioritization
186    bytes_avail: i64,
187    // Pending I/O requests
188    pending_requests: BinaryHeap<IoTask>,
189    // Priorities of in-flight requests
190    priorities_in_flight: PrioritiesInFlight,
191    // Set when the scheduler is finished to notify the I/O loop to shut down
192    // once all outstanding requests have been completed.
193    done_scheduling: bool,
194    // Time when the scheduler started
195    start: Instant,
196    // Last time we warned about backpressure
197    last_warn: AtomicU64,
198}
199
200impl IoQueueState {
201    fn new(io_capacity: u32, io_buffer_size: u64) -> Self {
202        Self {
203            iops_avail: io_capacity,
204            bytes_avail: io_buffer_size as i64,
205            pending_requests: BinaryHeap::new(),
206            priorities_in_flight: PrioritiesInFlight::new(io_capacity),
207            done_scheduling: false,
208            start: Instant::now(),
209            last_warn: AtomicU64::from(0),
210        }
211    }
212
213    fn finished(&self) -> bool {
214        self.done_scheduling && self.pending_requests.is_empty()
215    }
216
217    fn warn_if_needed(&self) {
218        let seconds_elapsed = self.start.elapsed().as_secs();
219        let last_warn = self.last_warn.load(Ordering::Acquire);
220        let since_last_warn = seconds_elapsed - last_warn;
221        if (last_warn == 0
222            && seconds_elapsed > BACKPRESSURE_MIN
223            && seconds_elapsed < BACKPRESSURE_DEBOUNCE)
224            || since_last_warn > BACKPRESSURE_DEBOUNCE
225        {
226            tracing::event!(tracing::Level::DEBUG, "Backpressure throttle exceeded");
227            log::debug!("Backpressure throttle is full, I/O will pause until buffer is drained.  Max I/O bandwidth will not be achieved because CPU is falling behind");
228            self.last_warn
229                .store(seconds_elapsed.max(1), Ordering::Release);
230        }
231    }
232
233    fn can_deliver(&self, task: &IoTask) -> bool {
234        if self.iops_avail == 0 {
235            false
236        } else if task.priority <= self.priorities_in_flight.min_in_flight() {
237            true
238        } else if task.num_bytes() as i64 > self.bytes_avail {
239            self.warn_if_needed();
240            false
241        } else {
242            true
243        }
244    }
245
246    fn next_task(&mut self) -> Option<IoTask> {
247        let task = self.pending_requests.peek()?;
248        if self.can_deliver(task) {
249            self.priorities_in_flight.push(task.priority);
250            self.iops_avail -= 1;
251            self.bytes_avail -= task.num_bytes() as i64;
252            if self.bytes_avail < 0 {
253                // This can happen when we admit special priority requests
254                log::debug!(
255                    "Backpressure throttle temporarily exceeded by {} bytes due to priority I/O",
256                    -self.bytes_avail
257                );
258            }
259            Some(self.pending_requests.pop().unwrap())
260        } else {
261            None
262        }
263    }
264}
265
266// This is modeled after the MPSC queue described here: https://docs.rs/tokio/latest/tokio/sync/struct.Notify.html
267//
268// However, it only needs to be SPSC since there is only one "scheduler thread"
269// and one I/O loop.
270struct IoQueue {
271    // Queue state
272    state: Mutex<IoQueueState>,
273    // Used to signal new I/O requests have arrived that might potentially be runnable
274    notify: Notify,
275}
276
277impl IoQueue {
278    fn new(io_capacity: u32, io_buffer_size: u64) -> Self {
279        Self {
280            state: Mutex::new(IoQueueState::new(io_capacity, io_buffer_size)),
281            notify: Notify::new(),
282        }
283    }
284
285    fn push(&self, task: IoTask) {
286        log::trace!(
287            "Inserting I/O request for {} bytes with priority ({},{}) into I/O queue",
288            task.num_bytes(),
289            task.priority >> 64,
290            task.priority & 0xFFFFFFFFFFFFFFFF
291        );
292        let mut state = self.state.lock().unwrap();
293        state.pending_requests.push(task);
294        drop(state);
295
296        self.notify.notify_one();
297    }
298
299    async fn pop(&self) -> Option<IoTask> {
300        loop {
301            {
302                // First, grab a reservation on the global IOPS quota
303                // If we then get a task to run, transfer the reservation
304                // to the task.  Otherwise, the reservation will be released
305                // when iop_res is dropped.
306                let mut iop_res = IOPS_QUOTA.acquire().await;
307                // Next, try and grab a reservation from the queue
308                let mut state = self.state.lock().unwrap();
309                if let Some(task) = state.next_task() {
310                    // Reservation successfully acquired, we will release the global
311                    // global reservation after task has run.
312                    iop_res.forget();
313                    return Some(task);
314                }
315
316                if state.finished() {
317                    return None;
318                }
319            }
320
321            self.notify.notified().await;
322        }
323    }
324
325    fn on_iop_complete(&self) {
326        let mut state = self.state.lock().unwrap();
327        state.iops_avail += 1;
328        drop(state);
329
330        self.notify.notify_one();
331    }
332
333    fn on_bytes_consumed(&self, bytes: u64, priority: u128, num_reqs: usize) {
334        let mut state = self.state.lock().unwrap();
335        state.bytes_avail += bytes as i64;
336        for _ in 0..num_reqs {
337            state.priorities_in_flight.remove(priority);
338        }
339        drop(state);
340
341        self.notify.notify_one();
342    }
343
344    fn close(&self) {
345        let mut state = self.state.lock().unwrap();
346        state.done_scheduling = true;
347        drop(state);
348
349        self.notify.notify_one();
350    }
351}
352
353// There is one instance of MutableBatch shared by all the I/O operations
354// that make up a single request.  When all the I/O operations complete
355// then the MutableBatch goes out of scope and the batch request is considered
356// complete
357struct MutableBatch<F: FnOnce(Response) + Send> {
358    when_done: Option<F>,
359    data_buffers: Vec<Bytes>,
360    num_bytes: u64,
361    priority: u128,
362    num_reqs: usize,
363    err: Option<Box<dyn std::error::Error + Send + Sync + 'static>>,
364}
365
366impl<F: FnOnce(Response) + Send> MutableBatch<F> {
367    fn new(when_done: F, num_data_buffers: u32, priority: u128, num_reqs: usize) -> Self {
368        Self {
369            when_done: Some(when_done),
370            data_buffers: vec![Bytes::default(); num_data_buffers as usize],
371            num_bytes: 0,
372            priority,
373            num_reqs,
374            err: None,
375        }
376    }
377}
378
379// Rather than keep track of when all the I/O requests are finished so that we
380// can deliver the batch of data we let Rust do that for us.  When all I/O's are
381// done then the MutableBatch will go out of scope and we know we have all the
382// data.
383impl<F: FnOnce(Response) + Send> Drop for MutableBatch<F> {
384    fn drop(&mut self) {
385        // If we have an error, return that.  Otherwise return the data
386        let result = if self.err.is_some() {
387            Err(Error::Wrapped {
388                error: self.err.take().unwrap(),
389                location: location!(),
390            })
391        } else {
392            let mut data = Vec::new();
393            std::mem::swap(&mut data, &mut self.data_buffers);
394            Ok(data)
395        };
396        // We don't really care if no one is around to receive it, just let
397        // the result go out of scope and get cleaned up
398        let response = Response {
399            data: result,
400            num_bytes: self.num_bytes,
401            priority: self.priority,
402            num_reqs: self.num_reqs,
403        };
404        (self.when_done.take().unwrap())(response);
405    }
406}
407
408struct DataChunk {
409    task_idx: usize,
410    num_bytes: u64,
411    data: Result<Bytes>,
412}
413
414trait DataSink: Send {
415    fn deliver_data(&mut self, data: DataChunk);
416}
417
418impl<F: FnOnce(Response) + Send> DataSink for MutableBatch<F> {
419    // Called by worker tasks to add data to the MutableBatch
420    fn deliver_data(&mut self, data: DataChunk) {
421        self.num_bytes += data.num_bytes;
422        match data.data {
423            Ok(data_bytes) => {
424                self.data_buffers[data.task_idx] = data_bytes;
425            }
426            Err(err) => {
427                // This keeps the original error, if present
428                self.err.get_or_insert(Box::new(err));
429            }
430        }
431    }
432}
433
434struct IoTask {
435    reader: Arc<dyn Reader>,
436    to_read: Range<u64>,
437    when_done: Box<dyn FnOnce(Result<Bytes>) + Send>,
438    priority: u128,
439}
440
441impl Eq for IoTask {}
442
443impl PartialEq for IoTask {
444    fn eq(&self, other: &Self) -> bool {
445        self.priority == other.priority
446    }
447}
448
449impl PartialOrd for IoTask {
450    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
451        Some(self.cmp(other))
452    }
453}
454
455impl Ord for IoTask {
456    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
457        // This is intentionally inverted.  We want a min-heap
458        other.priority.cmp(&self.priority)
459    }
460}
461
462impl IoTask {
463    fn num_bytes(&self) -> u64 {
464        self.to_read.end - self.to_read.start
465    }
466
467    async fn run(self) {
468        let bytes = if self.to_read.start == self.to_read.end {
469            Ok(Bytes::new())
470        } else {
471            let bytes_fut = self
472                .reader
473                .get_range(self.to_read.start as usize..self.to_read.end as usize);
474            IOPS_COUNTER.fetch_add(1, Ordering::Release);
475            BYTES_READ_COUNTER.fetch_add(self.num_bytes(), Ordering::Release);
476            bytes_fut.await.map_err(Error::from)
477        };
478        IOPS_QUOTA.release();
479        (self.when_done)(bytes);
480    }
481}
482
483// Every time a scheduler starts up it launches a task to run the I/O loop.  This loop
484// repeats endlessly until the scheduler is destroyed.
485async fn run_io_loop(tasks: Arc<IoQueue>) {
486    // Pop the first finished task off the queue and submit another until
487    // we are done
488    loop {
489        let next_task = tasks.pop().await;
490        match next_task {
491            Some(task) => {
492                tokio::spawn(task.run());
493            }
494            None => {
495                // The sender has been dropped, we are done
496                return;
497            }
498        }
499    }
500}
501
502#[derive(Debug)]
503struct StatsCollector {
504    iops: AtomicU64,
505    requests: AtomicU64,
506    bytes_read: AtomicU64,
507}
508
509impl StatsCollector {
510    fn new() -> Self {
511        Self {
512            iops: AtomicU64::new(0),
513            requests: AtomicU64::new(0),
514            bytes_read: AtomicU64::new(0),
515        }
516    }
517
518    fn iops(&self) -> u64 {
519        self.iops.load(Ordering::Relaxed)
520    }
521
522    fn bytes_read(&self) -> u64 {
523        self.bytes_read.load(Ordering::Relaxed)
524    }
525
526    fn requests(&self) -> u64 {
527        self.requests.load(Ordering::Relaxed)
528    }
529
530    fn record_request(&self, request: &[Range<u64>]) {
531        self.requests.fetch_add(1, Ordering::Relaxed);
532        self.iops.fetch_add(request.len() as u64, Ordering::Relaxed);
533        self.bytes_read.fetch_add(
534            request.iter().map(|r| r.end - r.start).sum::<u64>(),
535            Ordering::Relaxed,
536        );
537    }
538}
539
540pub struct ScanStats {
541    pub iops: u64,
542    pub requests: u64,
543    pub bytes_read: u64,
544}
545
546impl ScanStats {
547    fn new(stats: &StatsCollector) -> Self {
548        Self {
549            iops: stats.iops(),
550            requests: stats.requests(),
551            bytes_read: stats.bytes_read(),
552        }
553    }
554}
555
556/// An I/O scheduler which wraps an ObjectStore and throttles the amount of
557/// parallel I/O that can be run.
558///
559/// TODO: This will also add coalescing
560pub struct ScanScheduler {
561    object_store: Arc<ObjectStore>,
562    io_queue: Arc<IoQueue>,
563    stats: Arc<StatsCollector>,
564}
565
566impl Debug for ScanScheduler {
567    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
568        f.debug_struct("ScanScheduler")
569            .field("object_store", &self.object_store)
570            .finish()
571    }
572}
573
574struct Response {
575    data: Result<Vec<Bytes>>,
576    priority: u128,
577    num_reqs: usize,
578    num_bytes: u64,
579}
580
581#[derive(Debug, Clone, Copy)]
582pub struct SchedulerConfig {
583    /// the # of bytes that can be buffered but not yet requested.
584    /// This controls back pressure.  If data is not processed quickly enough then this
585    /// buffer will fill up and the I/O loop will pause until the buffer is drained.
586    pub io_buffer_size_bytes: u64,
587}
588
589impl SchedulerConfig {
590    /// Big enough for unit testing
591    pub fn default_for_testing() -> Self {
592        Self {
593            io_buffer_size_bytes: 256 * 1024 * 1024,
594        }
595    }
596
597    /// Configuration that should generally maximize bandwidth (not trying to save RAM
598    /// at all).  We assume a max page size of 32MiB and then allow 32MiB per I/O thread
599    pub fn max_bandwidth(store: &ObjectStore) -> Self {
600        Self {
601            io_buffer_size_bytes: 32 * 1024 * 1024 * store.io_parallelism() as u64,
602        }
603    }
604}
605
606impl ScanScheduler {
607    /// Create a new scheduler with the given I/O capacity
608    ///
609    /// # Arguments
610    ///
611    /// * object_store - the store to wrap
612    /// * config - configuration settings for the scheduler
613    pub fn new(object_store: Arc<ObjectStore>, config: SchedulerConfig) -> Arc<Self> {
614        let io_capacity = object_store.io_parallelism();
615        let io_queue = Arc::new(IoQueue::new(
616            io_capacity as u32,
617            config.io_buffer_size_bytes,
618        ));
619        let scheduler = Self {
620            object_store,
621            io_queue: io_queue.clone(),
622            stats: Arc::new(StatsCollector::new()),
623        };
624        tokio::task::spawn(async move { run_io_loop(io_queue).await });
625        Arc::new(scheduler)
626    }
627
628    /// Open a file for reading
629    ///
630    /// # Arguments
631    ///
632    /// * path - the path to the file to open
633    /// * base_priority - the base priority for I/O requests submitted to this file scheduler
634    ///   this will determine the upper 64 bits of priority (the lower 64 bits
635    ///   come from `submit_request` and `submit_single`)
636    pub async fn open_file_with_priority(
637        self: &Arc<Self>,
638        path: &Path,
639        base_priority: u64,
640        file_size_bytes: &CachedFileSize,
641    ) -> Result<FileScheduler> {
642        let file_size_bytes = if let Some(size) = file_size_bytes.get() {
643            u64::from(size) as usize
644        } else {
645            let size = self.object_store.size(path).await?;
646            if let Some(size) = NonZero::new(size as u64) {
647                file_size_bytes.set(size);
648            }
649            size
650        };
651        let reader = self
652            .object_store
653            .open_with_size(path, file_size_bytes)
654            .await?;
655        let block_size = self.object_store.block_size() as u64;
656        let max_iop_size = self.object_store.max_iop_size();
657        Ok(FileScheduler {
658            reader: reader.into(),
659            block_size,
660            root: self.clone(),
661            base_priority,
662            max_iop_size,
663        })
664    }
665
666    /// Open a file with a default priority of 0
667    ///
668    /// See [`Self::open_file_with_priority`] for more information on the priority
669    pub async fn open_file(
670        self: &Arc<Self>,
671        path: &Path,
672        file_size_bytes: &CachedFileSize,
673    ) -> Result<FileScheduler> {
674        self.open_file_with_priority(path, 0, file_size_bytes).await
675    }
676
677    fn do_submit_request(
678        &self,
679        reader: Arc<dyn Reader>,
680        request: Vec<Range<u64>>,
681        tx: oneshot::Sender<Response>,
682        priority: u128,
683    ) {
684        let num_iops = request.len() as u32;
685
686        let when_all_io_done = move |bytes_and_permits| {
687            // We don't care if the receiver has given up so discard the result
688            let _ = tx.send(bytes_and_permits);
689        };
690
691        let dest = Arc::new(Mutex::new(Box::new(MutableBatch::new(
692            when_all_io_done,
693            num_iops,
694            priority,
695            request.len(),
696        ))));
697
698        for (task_idx, iop) in request.into_iter().enumerate() {
699            let dest = dest.clone();
700            let io_queue = self.io_queue.clone();
701            let num_bytes = iop.end - iop.start;
702            let task = IoTask {
703                reader: reader.clone(),
704                to_read: iop,
705                priority,
706                when_done: Box::new(move |data| {
707                    io_queue.on_iop_complete();
708                    let mut dest = dest.lock().unwrap();
709                    let chunk = DataChunk {
710                        data,
711                        task_idx,
712                        num_bytes,
713                    };
714                    dest.deliver_data(chunk);
715                }),
716            };
717            self.io_queue.push(task);
718        }
719    }
720
721    fn submit_request(
722        &self,
723        reader: Arc<dyn Reader>,
724        request: Vec<Range<u64>>,
725        priority: u128,
726    ) -> impl Future<Output = Result<Vec<Bytes>>> + Send {
727        let (tx, rx) = oneshot::channel::<Response>();
728
729        self.do_submit_request(reader, request, tx, priority);
730
731        let io_queue = self.io_queue.clone();
732
733        rx.map(move |wrapped_rsp| {
734            // Right now, it isn't possible for I/O to be cancelled so a cancel error should
735            // not occur
736            let rsp = wrapped_rsp.unwrap();
737            io_queue.on_bytes_consumed(rsp.num_bytes, rsp.priority, rsp.num_reqs);
738            rsp.data
739        })
740    }
741
742    pub fn stats(&self) -> ScanStats {
743        ScanStats::new(self.stats.as_ref())
744    }
745}
746
747impl Drop for ScanScheduler {
748    fn drop(&mut self) {
749        self.io_queue.close();
750    }
751}
752
753/// A throttled file reader
754#[derive(Clone, Debug)]
755pub struct FileScheduler {
756    reader: Arc<dyn Reader>,
757    root: Arc<ScanScheduler>,
758    block_size: u64,
759    base_priority: u64,
760    max_iop_size: u64,
761}
762
763fn is_close_together(range1: &Range<u64>, range2: &Range<u64>, block_size: u64) -> bool {
764    // Note that range1.end <= range2.start is possible (e.g. when decoding string arrays)
765    range2.start <= (range1.end + block_size)
766}
767
768fn is_overlapping(range1: &Range<u64>, range2: &Range<u64>) -> bool {
769    range1.start < range2.end && range2.start < range1.end
770}
771
772impl FileScheduler {
773    /// Submit a batch of I/O requests to the reader
774    ///
775    /// The requests will be queued in a FIFO manner and, when all requests
776    /// have been fulfilled, the returned future will be completed.
777    ///
778    /// Each request has a given priority.  If the I/O loop is full then requests
779    /// will be buffered and requests with the *lowest* priority will be released
780    /// from the buffer first.
781    ///
782    /// Each request has a backpressure ID which controls which backpressure throttle
783    /// is applied to the request.  Requests made to the same backpressure throttle
784    /// will be throttled together.
785    pub fn submit_request(
786        &self,
787        request: Vec<Range<u64>>,
788        priority: u64,
789    ) -> impl Future<Output = Result<Vec<Bytes>>> + Send {
790        // The final priority is a combination of the row offset and the file number
791        let priority = ((self.base_priority as u128) << 64) + priority as u128;
792
793        let mut merged_requests = Vec::with_capacity(request.len());
794
795        if !request.is_empty() {
796            let mut curr_interval = request[0].clone();
797
798            for req in request.iter().skip(1) {
799                if is_close_together(&curr_interval, req, self.block_size) {
800                    curr_interval.end = curr_interval.end.max(req.end);
801                } else {
802                    merged_requests.push(curr_interval);
803                    curr_interval = req.clone();
804                }
805            }
806
807            merged_requests.push(curr_interval);
808        }
809
810        let mut updated_requests = Vec::with_capacity(merged_requests.len());
811        for req in merged_requests {
812            if req.is_empty() {
813                updated_requests.push(req);
814            } else {
815                let num_requests = (req.end - req.start).div_ceil(self.max_iop_size);
816                let bytes_per_request = (req.end - req.start) / num_requests;
817                for i in 0..num_requests {
818                    let start = req.start + i * bytes_per_request;
819                    let end = if i == num_requests - 1 {
820                        // Last request is a bit bigger due to rounding
821                        req.end
822                    } else {
823                        start + bytes_per_request
824                    };
825                    updated_requests.push(start..end);
826                }
827            }
828        }
829
830        self.root.stats.record_request(&updated_requests);
831
832        let bytes_vec_fut =
833            self.root
834                .submit_request(self.reader.clone(), updated_requests.clone(), priority);
835
836        let mut updated_index = 0;
837        let mut final_bytes = Vec::with_capacity(request.len());
838
839        async move {
840            let bytes_vec = bytes_vec_fut.await?;
841
842            let mut orig_index = 0;
843            while (updated_index < updated_requests.len()) && (orig_index < request.len()) {
844                let updated_range = &updated_requests[updated_index];
845                let orig_range = &request[orig_index];
846                let byte_offset = updated_range.start as usize;
847
848                if is_overlapping(updated_range, orig_range) {
849                    // We need to undo the coalescing and splitting done earlier
850                    let start = orig_range.start as usize - byte_offset;
851                    if orig_range.end <= updated_range.end {
852                        // The original range is fully contained in the updated range, can do
853                        // zero-copy slice
854                        let end = orig_range.end as usize - byte_offset;
855                        final_bytes.push(bytes_vec[updated_index].slice(start..end));
856                    } else {
857                        // The original read was split into multiple requests, need to copy
858                        // back into a single buffer
859                        let orig_size = orig_range.end - orig_range.start;
860                        let mut merged_bytes = Vec::with_capacity(orig_size as usize);
861                        merged_bytes.extend_from_slice(&bytes_vec[updated_index].slice(start..));
862                        let mut copy_offset = merged_bytes.len() as u64;
863                        while copy_offset < orig_size {
864                            updated_index += 1;
865                            let next_range = &updated_requests[updated_index];
866                            let bytes_to_take =
867                                (orig_size - copy_offset).min(next_range.end - next_range.start);
868                            merged_bytes.extend_from_slice(
869                                &bytes_vec[updated_index].slice(0..bytes_to_take as usize),
870                            );
871                            copy_offset += bytes_to_take;
872                        }
873                        final_bytes.push(Bytes::from(merged_bytes));
874                    }
875                    orig_index += 1;
876                } else {
877                    updated_index += 1;
878                }
879            }
880
881            Ok(final_bytes)
882        }
883    }
884
885    pub fn with_priority(&self, priority: u64) -> Self {
886        Self {
887            reader: self.reader.clone(),
888            root: self.root.clone(),
889            block_size: self.block_size,
890            max_iop_size: self.max_iop_size,
891            base_priority: priority,
892        }
893    }
894
895    /// Submit a single IOP to the reader
896    ///
897    /// If you have multiple IOPS to perform then [`Self::submit_request`] is going
898    /// to be more efficient.
899    ///
900    /// See [`Self::submit_request`] for more information on the priority and backpressure.
901    pub fn submit_single(
902        &self,
903        range: Range<u64>,
904        priority: u64,
905    ) -> impl Future<Output = Result<Bytes>> + Send {
906        self.submit_request(vec![range], priority)
907            .map_ok(|vec_bytes| vec_bytes.into_iter().next().unwrap())
908    }
909
910    /// Provides access to the underlying reader
911    ///
912    /// Do not use this for reading data as it will bypass any I/O scheduling!
913    /// This is mainly exposed to allow metadata operations (e.g size, block_size,)
914    /// which either aren't IOPS or we don't throttle
915    pub fn reader(&self) -> &Arc<dyn Reader> {
916        &self.reader
917    }
918}
919
920#[cfg(test)]
921mod tests {
922    use std::{collections::VecDeque, time::Duration};
923
924    use futures::poll;
925    use rand::RngCore;
926    use tempfile::tempdir;
927
928    use object_store::{memory::InMemory, GetRange, ObjectStore as OSObjectStore};
929    use tokio::{runtime::Handle, time::timeout};
930    use url::Url;
931
932    use crate::{
933        object_store::{DEFAULT_DOWNLOAD_RETRY_COUNT, DEFAULT_MAX_IOP_SIZE},
934        testing::MockObjectStore,
935    };
936
937    use super::*;
938
939    #[tokio::test]
940    async fn test_full_seq_read() {
941        let tmpdir = tempdir().unwrap();
942        let tmp_path = tmpdir.path().to_str().unwrap();
943        let tmp_path = Path::parse(tmp_path).unwrap();
944        let tmp_file = tmp_path.child("foo.file");
945
946        let obj_store = Arc::new(ObjectStore::local());
947
948        // Write 1MiB of data
949        const DATA_SIZE: u64 = 1024 * 1024;
950        let mut some_data = vec![0; DATA_SIZE as usize];
951        rand::thread_rng().fill_bytes(&mut some_data);
952        obj_store.put(&tmp_file, &some_data).await.unwrap();
953
954        let config = SchedulerConfig::default_for_testing();
955
956        let scheduler = ScanScheduler::new(obj_store, config);
957
958        let file_scheduler = scheduler
959            .open_file(&tmp_file, &CachedFileSize::unknown())
960            .await
961            .unwrap();
962
963        // Read it back 4KiB at a time
964        const READ_SIZE: u64 = 4 * 1024;
965        let mut reqs = VecDeque::new();
966        let mut offset = 0;
967        while offset < DATA_SIZE {
968            reqs.push_back(
969                #[allow(clippy::single_range_in_vec_init)]
970                file_scheduler
971                    .submit_request(vec![offset..offset + READ_SIZE], 0)
972                    .await
973                    .unwrap(),
974            );
975            offset += READ_SIZE;
976        }
977
978        offset = 0;
979        // Note: we should get parallel I/O even though we are consuming serially
980        while offset < DATA_SIZE {
981            let data = reqs.pop_front().unwrap();
982            let actual = &data[0];
983            let expected = &some_data[offset as usize..(offset + READ_SIZE) as usize];
984            assert_eq!(expected, actual);
985            offset += READ_SIZE;
986        }
987    }
988
989    #[tokio::test]
990    async fn test_split_coalesce() {
991        let tmpdir = tempdir().unwrap();
992        let tmp_path = tmpdir.path().to_str().unwrap();
993        let tmp_path = Path::parse(tmp_path).unwrap();
994        let tmp_file = tmp_path.child("foo.file");
995
996        let obj_store = Arc::new(ObjectStore::local());
997
998        // Write 75MiB of data
999        const DATA_SIZE: u64 = 75 * 1024 * 1024;
1000        let mut some_data = vec![0; DATA_SIZE as usize];
1001        rand::thread_rng().fill_bytes(&mut some_data);
1002        obj_store.put(&tmp_file, &some_data).await.unwrap();
1003
1004        let config = SchedulerConfig::default_for_testing();
1005
1006        let scheduler = ScanScheduler::new(obj_store, config);
1007
1008        let file_scheduler = scheduler
1009            .open_file(&tmp_file, &CachedFileSize::unknown())
1010            .await
1011            .unwrap();
1012
1013        // These 3 requests should be coalesced into a single I/O because they are within 4KiB
1014        // of each other
1015        let req =
1016            file_scheduler.submit_request(vec![50_000..51_000, 52_000..53_000, 54_000..55_000], 0);
1017
1018        let bytes = req.await.unwrap();
1019
1020        assert_eq!(bytes[0], &some_data[50_000..51_000]);
1021        assert_eq!(bytes[1], &some_data[52_000..53_000]);
1022        assert_eq!(bytes[2], &some_data[54_000..55_000]);
1023
1024        assert_eq!(1, scheduler.stats().iops);
1025
1026        // This should be split into 5 requests because it is so large
1027        let req = file_scheduler.submit_request(vec![0..DATA_SIZE], 0);
1028        let bytes = req.await.unwrap();
1029        assert!(bytes[0] == some_data, "data is not the same");
1030
1031        assert_eq!(6, scheduler.stats().iops);
1032
1033        // None of these requests are bigger than the max IOP size but they will be coalesced into
1034        // one IOP that is bigger and then split back into 2 requests that don't quite align with the original
1035        // ranges.
1036        let chunk_size = *DEFAULT_MAX_IOP_SIZE;
1037        let req = file_scheduler.submit_request(
1038            vec![
1039                10..chunk_size,
1040                chunk_size + 10..(chunk_size * 2) - 20,
1041                chunk_size * 2..(chunk_size * 2) + 10,
1042            ],
1043            0,
1044        );
1045
1046        let bytes = req.await.unwrap();
1047        let chunk_size = chunk_size as usize;
1048        assert!(
1049            bytes[0] == some_data[10..chunk_size],
1050            "data is not the same"
1051        );
1052        assert!(
1053            bytes[1] == some_data[chunk_size + 10..(chunk_size * 2) - 20],
1054            "data is not the same"
1055        );
1056        assert!(
1057            bytes[2] == some_data[chunk_size * 2..(chunk_size * 2) + 10],
1058            "data is not the same"
1059        );
1060        assert_eq!(8, scheduler.stats().iops);
1061
1062        let reads = (0..44)
1063            .map(|i| (i * 1_000_000..(i + 1) * 1_000_000))
1064            .collect::<Vec<_>>();
1065        let req = file_scheduler.submit_request(reads, 0);
1066        let bytes = req.await.unwrap();
1067        for (i, bytes) in bytes.iter().enumerate() {
1068            assert!(
1069                bytes == &some_data[i * 1_000_000..(i + 1) * 1_000_000],
1070                "data is not the same"
1071            );
1072        }
1073        assert_eq!(11, scheduler.stats().iops);
1074    }
1075
1076    #[tokio::test]
1077    async fn test_priority() {
1078        let some_path = Path::parse("foo").unwrap();
1079        let base_store = Arc::new(InMemory::new());
1080        base_store
1081            .put(&some_path, vec![0; 1000].into())
1082            .await
1083            .unwrap();
1084
1085        let semaphore = Arc::new(tokio::sync::Semaphore::new(0));
1086        let mut obj_store = MockObjectStore::default();
1087        let semaphore_copy = semaphore.clone();
1088        obj_store
1089            .expect_get_opts()
1090            .returning(move |location, options| {
1091                let semaphore = semaphore.clone();
1092                let base_store = base_store.clone();
1093                let location = location.clone();
1094                async move {
1095                    semaphore.acquire().await.unwrap().forget();
1096                    base_store.get_opts(&location, options).await
1097                }
1098                .boxed()
1099            });
1100        let obj_store = Arc::new(ObjectStore::new(
1101            Arc::new(obj_store),
1102            Url::parse("mem://").unwrap(),
1103            Some(500),
1104            None,
1105            false,
1106            false,
1107            1,
1108            DEFAULT_DOWNLOAD_RETRY_COUNT,
1109        ));
1110
1111        let config = SchedulerConfig {
1112            io_buffer_size_bytes: 1024 * 1024,
1113        };
1114
1115        let scan_scheduler = ScanScheduler::new(obj_store, config);
1116
1117        let file_scheduler = scan_scheduler
1118            .open_file(&Path::parse("foo").unwrap(), &CachedFileSize::new(1000))
1119            .await
1120            .unwrap();
1121
1122        // Issue a request, priority doesn't matter, it will be submitted
1123        // immediately (it will go pending)
1124        // Note: the timeout is to prevent a deadlock if the test fails.
1125        let first_fut = timeout(
1126            Duration::from_secs(10),
1127            file_scheduler.submit_single(0..10, 0),
1128        )
1129        .boxed();
1130
1131        // Issue another low priority request (it will go in queue)
1132        let mut second_fut = timeout(
1133            Duration::from_secs(10),
1134            file_scheduler.submit_single(0..20, 100),
1135        )
1136        .boxed();
1137
1138        // Issue a high priority request (it will go in queue and should bump
1139        // the other queued request down)
1140        let mut third_fut = timeout(
1141            Duration::from_secs(10),
1142            file_scheduler.submit_single(0..30, 0),
1143        )
1144        .boxed();
1145
1146        // Finish one file, should be the in-flight first request
1147        semaphore_copy.add_permits(1);
1148        assert!(first_fut.await.unwrap().unwrap().len() == 10);
1149        // Other requests should not be finished
1150        assert!(poll!(&mut second_fut).is_pending());
1151        assert!(poll!(&mut third_fut).is_pending());
1152
1153        // Next should be high priority request
1154        semaphore_copy.add_permits(1);
1155        assert!(third_fut.await.unwrap().unwrap().len() == 30);
1156        assert!(poll!(&mut second_fut).is_pending());
1157
1158        // Finally, the low priority request
1159        semaphore_copy.add_permits(1);
1160        assert!(second_fut.await.unwrap().unwrap().len() == 20);
1161    }
1162
1163    #[tokio::test(flavor = "multi_thread")]
1164    async fn test_backpressure() {
1165        let some_path = Path::parse("foo").unwrap();
1166        let base_store = Arc::new(InMemory::new());
1167        base_store
1168            .put(&some_path, vec![0; 100000].into())
1169            .await
1170            .unwrap();
1171
1172        let bytes_read = Arc::new(AtomicU64::from(0));
1173        let mut obj_store = MockObjectStore::default();
1174        let bytes_read_copy = bytes_read.clone();
1175        // Wraps the obj_store to keep track of how many bytes have been read
1176        obj_store
1177            .expect_get_opts()
1178            .returning(move |location, options| {
1179                let range = options.range.as_ref().unwrap();
1180                let num_bytes = match range {
1181                    GetRange::Bounded(bounded) => bounded.end - bounded.start,
1182                    _ => panic!(),
1183                };
1184                bytes_read_copy.fetch_add(num_bytes as u64, Ordering::Release);
1185                let location = location.clone();
1186                let base_store = base_store.clone();
1187                async move { base_store.get_opts(&location, options).await }.boxed()
1188            });
1189        let obj_store = Arc::new(ObjectStore::new(
1190            Arc::new(obj_store),
1191            Url::parse("mem://").unwrap(),
1192            Some(500),
1193            None,
1194            false,
1195            false,
1196            1,
1197            DEFAULT_DOWNLOAD_RETRY_COUNT,
1198        ));
1199
1200        let config = SchedulerConfig {
1201            io_buffer_size_bytes: 10,
1202        };
1203
1204        let scan_scheduler = ScanScheduler::new(obj_store.clone(), config);
1205
1206        let file_scheduler = scan_scheduler
1207            .open_file(&Path::parse("foo").unwrap(), &CachedFileSize::new(100000))
1208            .await
1209            .unwrap();
1210
1211        let wait_for_idle = || async move {
1212            let handle = Handle::current();
1213            while handle.metrics().num_alive_tasks() != 1 {
1214                tokio::time::sleep(Duration::from_millis(10)).await;
1215            }
1216        };
1217        let wait_for_bytes_read_and_idle = |target_bytes: u64| {
1218            // We need to move `target` but don't want to move `bytes_read`
1219            let bytes_read = &bytes_read;
1220            async move {
1221                let bytes_read_copy = bytes_read.clone();
1222                while bytes_read_copy.load(Ordering::Acquire) < target_bytes {
1223                    tokio::time::sleep(Duration::from_millis(10)).await;
1224                }
1225                wait_for_idle().await;
1226            }
1227        };
1228
1229        // This read will begin immediately
1230        let first_fut = file_scheduler.submit_single(0..5, 0);
1231        // This read should also begin immediately
1232        let second_fut = file_scheduler.submit_single(0..5, 0);
1233        // This read will be throttled
1234        let third_fut = file_scheduler.submit_single(0..3, 0);
1235        // Two tasks (third_fut and unit test)
1236        wait_for_bytes_read_and_idle(10).await;
1237
1238        assert_eq!(first_fut.await.unwrap().len(), 5);
1239        // One task (unit test)
1240        wait_for_bytes_read_and_idle(13).await;
1241
1242        // 2 bytes are ready but 5 bytes requested, read will be blocked
1243        let fourth_fut = file_scheduler.submit_single(0..5, 0);
1244        wait_for_bytes_read_and_idle(13).await;
1245
1246        // Out of order completion is ok, will unblock backpressure
1247        assert_eq!(third_fut.await.unwrap().len(), 3);
1248        wait_for_bytes_read_and_idle(18).await;
1249
1250        assert_eq!(second_fut.await.unwrap().len(), 5);
1251        // At this point there are 5 bytes available in backpressure queue
1252        // Now we issue multi-read that can be partially fulfilled, it will read some bytes but
1253        // not all of them. (using large range gap to ensure request not coalesced)
1254        //
1255        // I'm actually not sure this behavior is great.  It's possible that we should just
1256        // block until we can fulfill the entire request.
1257        let fifth_fut = file_scheduler.submit_request(vec![0..3, 90000..90007], 0);
1258        wait_for_bytes_read_and_idle(21).await;
1259
1260        // Fifth future should eventually finish due to deadlock prevention
1261        let fifth_bytes = tokio::time::timeout(Duration::from_secs(10), fifth_fut)
1262            .await
1263            .unwrap();
1264        assert_eq!(
1265            fifth_bytes.unwrap().iter().map(|b| b.len()).sum::<usize>(),
1266            10
1267        );
1268
1269        // And now let's just make sure that we can read the rest of the data
1270        assert_eq!(fourth_fut.await.unwrap().len(), 5);
1271        wait_for_bytes_read_and_idle(28).await;
1272
1273        // Ensure deadlock prevention timeout can be disabled
1274        let config = SchedulerConfig {
1275            io_buffer_size_bytes: 10,
1276        };
1277
1278        let scan_scheduler = ScanScheduler::new(obj_store, config);
1279        let file_scheduler = scan_scheduler
1280            .open_file(&Path::parse("foo").unwrap(), &CachedFileSize::new(100000))
1281            .await
1282            .unwrap();
1283
1284        let first_fut = file_scheduler.submit_single(0..10, 0);
1285        let second_fut = file_scheduler.submit_single(0..10, 0);
1286
1287        std::thread::sleep(Duration::from_millis(100));
1288        assert_eq!(first_fut.await.unwrap().len(), 10);
1289        assert_eq!(second_fut.await.unwrap().len(), 10);
1290    }
1291
1292    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1293    async fn stress_backpressure() {
1294        // This test ensures that the backpressure mechanism works correctly with
1295        // regards to priority.  In other words, as long as all requests are consumed
1296        // in priority order then the backpressure mechanism should not deadlock
1297        let some_path = Path::parse("foo").unwrap();
1298        let obj_store = Arc::new(ObjectStore::memory());
1299        obj_store
1300            .put(&some_path, vec![0; 100000].as_slice())
1301            .await
1302            .unwrap();
1303
1304        // Only one request will be allowed in
1305        let config = SchedulerConfig {
1306            io_buffer_size_bytes: 1,
1307        };
1308        let scan_scheduler = ScanScheduler::new(obj_store.clone(), config);
1309        let file_scheduler = scan_scheduler
1310            .open_file(&some_path, &CachedFileSize::unknown())
1311            .await
1312            .unwrap();
1313
1314        let mut futs = Vec::with_capacity(10000);
1315        for idx in 0..10000 {
1316            futs.push(file_scheduler.submit_single(idx..idx + 1, idx));
1317        }
1318
1319        for fut in futs {
1320            fut.await.unwrap();
1321        }
1322    }
1323}