lance_io/
scheduler.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use bytes::Bytes;
5use futures::channel::oneshot;
6use futures::{FutureExt, TryFutureExt};
7use object_store::path::Path;
8use snafu::location;
9use std::collections::BinaryHeap;
10use std::fmt::Debug;
11use std::future::Future;
12use std::num::NonZero;
13use std::ops::Range;
14use std::sync::atomic::{AtomicU64, Ordering};
15use std::sync::{Arc, Mutex};
16use std::time::Instant;
17use tokio::sync::{Notify, Semaphore, SemaphorePermit};
18
19use lance_core::{Error, Result};
20
21use crate::object_store::ObjectStore;
22use crate::traits::Reader;
23use crate::utils::CachedFileSize;
24
25// Don't log backpressure warnings until at least this many seconds have passed
26const BACKPRESSURE_MIN: u64 = 5;
27// Don't log backpressure warnings more than once / minute
28const BACKPRESSURE_DEBOUNCE: u64 = 60;
29
30// Global counter of how many IOPS we have issued
31static IOPS_COUNTER: AtomicU64 = AtomicU64::new(0);
32// Global counter of how many bytes were read by the scheduler
33static BYTES_READ_COUNTER: AtomicU64 = AtomicU64::new(0);
34// By default, we limit the number of IOPS across the entire process to 128
35//
36// In theory this is enough for ~10GBps on S3 following the guidelines to issue
37// 1 IOP per 80MBps.  In practice, I have noticed slightly better performance going
38// up to 256.
39//
40// However, non-S3 stores (e.g. GCS, Azure) can suffer significantly from too many
41// concurrent IOPS.  For safety, we set the default to 128 and let the user override
42// this if needed.
43//
44// Note: this only limits things that run through the scheduler.  It does not limit
45// IOPS from other sources like writing or commits.
46static DEFAULT_PROCESS_IOPS_LIMIT: i32 = 128;
47
48pub fn iops_counter() -> u64 {
49    IOPS_COUNTER.load(Ordering::Acquire)
50}
51
52pub fn bytes_read_counter() -> u64 {
53    BYTES_READ_COUNTER.load(Ordering::Acquire)
54}
55
56// There are two structures that control the I/O scheduler concurrency.  First,
57// we have a hard limit on the number of IOPS that can be issued concurrently.
58// This limit is process-wide.
59//
60// Second, we try and limit how many I/O requests can be buffered in memory without
61// being consumed by a decoder of some kind.  This limit is per-scheduler.  We cannot
62// make this limit process wide without introducing deadlock (because the decoder for
63// file 0 might be waiting on IOPS blocked by a queue filled with requests for file 1)
64// and vice-versa.
65//
66// There is also a per-scan limit on the number of IOPS that can be issued concurrently.
67//
68// The process-wide limit exists when users need a hard limit on the number of parallel
69// IOPS, e.g. due to port availability limits or to prevent multiple scans from saturating
70// the network.  (Note: a process-wide limit of X will not necessarily limit the number of
71// open TCP connections to exactly X.  The underlying object store may open more connections
72// anyways)
73//
74// However, it can be too tough in some cases, e.g. when some scans are reading from
75// cloud storage and other scans are reading from local disk.  In these cases users don't
76// need to set a process-limit and can rely on the per-scan limits.
77
78// The IopsQuota enforces the first of the above limits, it is the per-process hard cap
79// on the number of IOPS that can be issued concurrently.
80//
81// The per-scan limits are enforced by IoQueue
82struct IopsQuota {
83    // An Option is used here to avoid mutex overhead if no limit is set
84    iops_avail: Option<Semaphore>,
85}
86
87/// A reservation on the global IOPS quota
88///
89/// When the reservation is dropped, the IOPS quota is released unless
90/// [`Self::forget`] is called.
91struct IopsReservation<'a> {
92    value: Option<SemaphorePermit<'a>>,
93}
94
95impl IopsReservation<'_> {
96    // Forget the reservation, so it won't be released on drop
97    fn forget(&mut self) {
98        if let Some(value) = self.value.take() {
99            value.forget();
100        }
101    }
102}
103
104impl IopsQuota {
105    // By default, we throttle the number of scan IOPS across the entire process
106    //
107    // However, the user can disable this by setting the environment variable
108    // LANCE_PROCESS_IO_THREADS_LIMIT to zero (or a negative integer).
109    fn new() -> Self {
110        let initial_capacity = std::env::var("LANCE_PROCESS_IO_THREADS_LIMIT")
111            .map(|s| {
112                s.parse::<i32>().unwrap_or_else(|_| {
113                    log::warn!("Ignoring invalid LANCE_PROCESS_IO_THREADS_LIMIT: {}", s);
114                    DEFAULT_PROCESS_IOPS_LIMIT
115                })
116            })
117            .unwrap_or(DEFAULT_PROCESS_IOPS_LIMIT);
118        let iops_avail = if initial_capacity <= 0 {
119            None
120        } else {
121            Some(Semaphore::new(initial_capacity as usize))
122        };
123        Self { iops_avail }
124    }
125
126    // Return a reservation on the global IOPS quota
127    fn release(&self) {
128        if let Some(iops_avail) = self.iops_avail.as_ref() {
129            iops_avail.add_permits(1);
130        }
131    }
132
133    // Acquire a reservation on the global IOPS quota
134    async fn acquire(&self) -> IopsReservation {
135        if let Some(iops_avail) = self.iops_avail.as_ref() {
136            IopsReservation {
137                value: Some(iops_avail.acquire().await.unwrap()),
138            }
139        } else {
140            IopsReservation { value: None }
141        }
142    }
143}
144
145lazy_static::lazy_static! {
146    static ref IOPS_QUOTA: IopsQuota = IopsQuota::new();
147}
148
149// We want to allow requests that have a lower priority than any
150// currently in-flight request.  This helps avoid potential deadlocks
151// related to backpressure.  Unfortunately, it is quite expensive to
152// keep track of which priorities are in-flight.
153//
154// TODO: At some point it would be nice if we can optimize this away but
155// in_flight should remain relatively small (generally less than 256 items)
156// and has not shown itself to be a bottleneck yet.
157struct PrioritiesInFlight {
158    in_flight: Vec<u128>,
159}
160
161impl PrioritiesInFlight {
162    fn new(capacity: u32) -> Self {
163        Self {
164            in_flight: Vec::with_capacity(capacity as usize * 2),
165        }
166    }
167
168    fn min_in_flight(&self) -> u128 {
169        self.in_flight.first().copied().unwrap_or(u128::MAX)
170    }
171
172    fn push(&mut self, prio: u128) {
173        let pos = match self.in_flight.binary_search(&prio) {
174            Ok(pos) => pos,
175            Err(pos) => pos,
176        };
177        self.in_flight.insert(pos, prio);
178    }
179
180    fn remove(&mut self, prio: u128) {
181        if let Ok(pos) = self.in_flight.binary_search(&prio) {
182            self.in_flight.remove(pos);
183        } else {
184            unreachable!();
185        }
186    }
187}
188
189struct IoQueueState {
190    // Number of IOPS we can issue concurrently before pausing I/O
191    iops_avail: u32,
192    // Number of bytes we are allowed to buffer in memory before pausing I/O
193    //
194    // This can dip below 0 due to I/O prioritization
195    bytes_avail: i64,
196    // Pending I/O requests
197    pending_requests: BinaryHeap<IoTask>,
198    // Priorities of in-flight requests
199    priorities_in_flight: PrioritiesInFlight,
200    // Set when the scheduler is finished to notify the I/O loop to shut down
201    // once all outstanding requests have been completed.
202    done_scheduling: bool,
203    // Time when the scheduler started
204    start: Instant,
205    // Last time we warned about backpressure
206    last_warn: AtomicU64,
207}
208
209impl IoQueueState {
210    fn new(io_capacity: u32, io_buffer_size: u64) -> Self {
211        Self {
212            iops_avail: io_capacity,
213            bytes_avail: io_buffer_size as i64,
214            pending_requests: BinaryHeap::new(),
215            priorities_in_flight: PrioritiesInFlight::new(io_capacity),
216            done_scheduling: false,
217            start: Instant::now(),
218            last_warn: AtomicU64::from(0),
219        }
220    }
221
222    fn finished(&self) -> bool {
223        self.done_scheduling && self.pending_requests.is_empty()
224    }
225
226    fn warn_if_needed(&self) {
227        let seconds_elapsed = self.start.elapsed().as_secs();
228        let last_warn = self.last_warn.load(Ordering::Acquire);
229        let since_last_warn = seconds_elapsed - last_warn;
230        if (last_warn == 0
231            && seconds_elapsed > BACKPRESSURE_MIN
232            && seconds_elapsed < BACKPRESSURE_DEBOUNCE)
233            || since_last_warn > BACKPRESSURE_DEBOUNCE
234        {
235            tracing::event!(tracing::Level::DEBUG, "Backpressure throttle exceeded");
236            log::debug!("Backpressure throttle is full, I/O will pause until buffer is drained.  Max I/O bandwidth will not be achieved because CPU is falling behind");
237            self.last_warn
238                .store(seconds_elapsed.max(1), Ordering::Release);
239        }
240    }
241
242    fn can_deliver(&self, task: &IoTask) -> bool {
243        if self.iops_avail == 0 {
244            false
245        } else if task.priority <= self.priorities_in_flight.min_in_flight() {
246            true
247        } else if task.num_bytes() as i64 > self.bytes_avail {
248            self.warn_if_needed();
249            false
250        } else {
251            true
252        }
253    }
254
255    fn next_task(&mut self) -> Option<IoTask> {
256        let task = self.pending_requests.peek()?;
257        if self.can_deliver(task) {
258            self.priorities_in_flight.push(task.priority);
259            self.iops_avail -= 1;
260            self.bytes_avail -= task.num_bytes() as i64;
261            if self.bytes_avail < 0 {
262                // This can happen when we admit special priority requests
263                log::debug!(
264                    "Backpressure throttle temporarily exceeded by {} bytes due to priority I/O",
265                    -self.bytes_avail
266                );
267            }
268            Some(self.pending_requests.pop().unwrap())
269        } else {
270            None
271        }
272    }
273}
274
275// This is modeled after the MPSC queue described here: https://docs.rs/tokio/latest/tokio/sync/struct.Notify.html
276//
277// However, it only needs to be SPSC since there is only one "scheduler thread"
278// and one I/O loop.
279struct IoQueue {
280    // Queue state
281    state: Mutex<IoQueueState>,
282    // Used to signal new I/O requests have arrived that might potentially be runnable
283    notify: Notify,
284}
285
286impl IoQueue {
287    fn new(io_capacity: u32, io_buffer_size: u64) -> Self {
288        Self {
289            state: Mutex::new(IoQueueState::new(io_capacity, io_buffer_size)),
290            notify: Notify::new(),
291        }
292    }
293
294    fn push(&self, task: IoTask) {
295        log::trace!(
296            "Inserting I/O request for {} bytes with priority ({},{}) into I/O queue",
297            task.num_bytes(),
298            task.priority >> 64,
299            task.priority & 0xFFFFFFFFFFFFFFFF
300        );
301        let mut state = self.state.lock().unwrap();
302        state.pending_requests.push(task);
303        drop(state);
304
305        self.notify.notify_one();
306    }
307
308    async fn pop(&self) -> Option<IoTask> {
309        loop {
310            {
311                // First, grab a reservation on the global IOPS quota
312                // If we then get a task to run, transfer the reservation
313                // to the task.  Otherwise, the reservation will be released
314                // when iop_res is dropped.
315                let mut iop_res = IOPS_QUOTA.acquire().await;
316                // Next, try and grab a reservation from the queue
317                let mut state = self.state.lock().unwrap();
318                if let Some(task) = state.next_task() {
319                    // Reservation successfully acquired, we will release the global
320                    // global reservation after task has run.
321                    iop_res.forget();
322                    return Some(task);
323                }
324
325                if state.finished() {
326                    return None;
327                }
328            }
329
330            self.notify.notified().await;
331        }
332    }
333
334    fn on_iop_complete(&self) {
335        let mut state = self.state.lock().unwrap();
336        state.iops_avail += 1;
337        drop(state);
338
339        self.notify.notify_one();
340    }
341
342    fn on_bytes_consumed(&self, bytes: u64, priority: u128, num_reqs: usize) {
343        let mut state = self.state.lock().unwrap();
344        state.bytes_avail += bytes as i64;
345        for _ in 0..num_reqs {
346            state.priorities_in_flight.remove(priority);
347        }
348        drop(state);
349
350        self.notify.notify_one();
351    }
352
353    fn close(&self) {
354        let mut state = self.state.lock().unwrap();
355        state.done_scheduling = true;
356        drop(state);
357
358        self.notify.notify_one();
359    }
360}
361
362// There is one instance of MutableBatch shared by all the I/O operations
363// that make up a single request.  When all the I/O operations complete
364// then the MutableBatch goes out of scope and the batch request is considered
365// complete
366struct MutableBatch<F: FnOnce(Response) + Send> {
367    when_done: Option<F>,
368    data_buffers: Vec<Bytes>,
369    num_bytes: u64,
370    priority: u128,
371    num_reqs: usize,
372    err: Option<Box<dyn std::error::Error + Send + Sync + 'static>>,
373}
374
375impl<F: FnOnce(Response) + Send> MutableBatch<F> {
376    fn new(when_done: F, num_data_buffers: u32, priority: u128, num_reqs: usize) -> Self {
377        Self {
378            when_done: Some(when_done),
379            data_buffers: vec![Bytes::default(); num_data_buffers as usize],
380            num_bytes: 0,
381            priority,
382            num_reqs,
383            err: None,
384        }
385    }
386}
387
388// Rather than keep track of when all the I/O requests are finished so that we
389// can deliver the batch of data we let Rust do that for us.  When all I/O's are
390// done then the MutableBatch will go out of scope and we know we have all the
391// data.
392impl<F: FnOnce(Response) + Send> Drop for MutableBatch<F> {
393    fn drop(&mut self) {
394        // If we have an error, return that.  Otherwise return the data
395        let result = if self.err.is_some() {
396            Err(Error::Wrapped {
397                error: self.err.take().unwrap(),
398                location: location!(),
399            })
400        } else {
401            let mut data = Vec::new();
402            std::mem::swap(&mut data, &mut self.data_buffers);
403            Ok(data)
404        };
405        // We don't really care if no one is around to receive it, just let
406        // the result go out of scope and get cleaned up
407        let response = Response {
408            data: result,
409            num_bytes: self.num_bytes,
410            priority: self.priority,
411            num_reqs: self.num_reqs,
412        };
413        (self.when_done.take().unwrap())(response);
414    }
415}
416
417struct DataChunk {
418    task_idx: usize,
419    num_bytes: u64,
420    data: Result<Bytes>,
421}
422
423trait DataSink: Send {
424    fn deliver_data(&mut self, data: DataChunk);
425}
426
427impl<F: FnOnce(Response) + Send> DataSink for MutableBatch<F> {
428    // Called by worker tasks to add data to the MutableBatch
429    fn deliver_data(&mut self, data: DataChunk) {
430        self.num_bytes += data.num_bytes;
431        match data.data {
432            Ok(data_bytes) => {
433                self.data_buffers[data.task_idx] = data_bytes;
434            }
435            Err(err) => {
436                // This keeps the original error, if present
437                self.err.get_or_insert(Box::new(err));
438            }
439        }
440    }
441}
442
443struct IoTask {
444    reader: Arc<dyn Reader>,
445    to_read: Range<u64>,
446    when_done: Box<dyn FnOnce(Result<Bytes>) + Send>,
447    priority: u128,
448}
449
450impl Eq for IoTask {}
451
452impl PartialEq for IoTask {
453    fn eq(&self, other: &Self) -> bool {
454        self.priority == other.priority
455    }
456}
457
458impl PartialOrd for IoTask {
459    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
460        Some(self.cmp(other))
461    }
462}
463
464impl Ord for IoTask {
465    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
466        // This is intentionally inverted.  We want a min-heap
467        other.priority.cmp(&self.priority)
468    }
469}
470
471impl IoTask {
472    fn num_bytes(&self) -> u64 {
473        self.to_read.end - self.to_read.start
474    }
475
476    async fn run(self) {
477        let bytes = if self.to_read.start == self.to_read.end {
478            Ok(Bytes::new())
479        } else {
480            let bytes_fut = self
481                .reader
482                .get_range(self.to_read.start as usize..self.to_read.end as usize);
483            IOPS_COUNTER.fetch_add(1, Ordering::Release);
484            BYTES_READ_COUNTER.fetch_add(self.num_bytes(), Ordering::Release);
485            bytes_fut.await.map_err(Error::from)
486        };
487        IOPS_QUOTA.release();
488        (self.when_done)(bytes);
489    }
490}
491
492// Every time a scheduler starts up it launches a task to run the I/O loop.  This loop
493// repeats endlessly until the scheduler is destroyed.
494async fn run_io_loop(tasks: Arc<IoQueue>) {
495    // Pop the first finished task off the queue and submit another until
496    // we are done
497    loop {
498        let next_task = tasks.pop().await;
499        match next_task {
500            Some(task) => {
501                tokio::spawn(task.run());
502            }
503            None => {
504                // The sender has been dropped, we are done
505                return;
506            }
507        }
508    }
509}
510
511#[derive(Debug)]
512struct StatsCollector {
513    iops: AtomicU64,
514    requests: AtomicU64,
515    bytes_read: AtomicU64,
516}
517
518impl StatsCollector {
519    fn new() -> Self {
520        Self {
521            iops: AtomicU64::new(0),
522            requests: AtomicU64::new(0),
523            bytes_read: AtomicU64::new(0),
524        }
525    }
526
527    fn iops(&self) -> u64 {
528        self.iops.load(Ordering::Relaxed)
529    }
530
531    fn bytes_read(&self) -> u64 {
532        self.bytes_read.load(Ordering::Relaxed)
533    }
534
535    fn requests(&self) -> u64 {
536        self.requests.load(Ordering::Relaxed)
537    }
538
539    fn record_request(&self, request: &[Range<u64>]) {
540        self.requests.fetch_add(1, Ordering::Relaxed);
541        self.iops.fetch_add(request.len() as u64, Ordering::Relaxed);
542        self.bytes_read.fetch_add(
543            request.iter().map(|r| r.end - r.start).sum::<u64>(),
544            Ordering::Relaxed,
545        );
546    }
547}
548
549pub struct ScanStats {
550    pub iops: u64,
551    pub requests: u64,
552    pub bytes_read: u64,
553}
554
555impl ScanStats {
556    fn new(stats: &StatsCollector) -> Self {
557        Self {
558            iops: stats.iops(),
559            requests: stats.requests(),
560            bytes_read: stats.bytes_read(),
561        }
562    }
563}
564
565/// An I/O scheduler which wraps an ObjectStore and throttles the amount of
566/// parallel I/O that can be run.
567///
568/// TODO: This will also add coalescing
569pub struct ScanScheduler {
570    object_store: Arc<ObjectStore>,
571    io_queue: Arc<IoQueue>,
572    stats: Arc<StatsCollector>,
573}
574
575impl Debug for ScanScheduler {
576    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
577        f.debug_struct("ScanScheduler")
578            .field("object_store", &self.object_store)
579            .finish()
580    }
581}
582
583struct Response {
584    data: Result<Vec<Bytes>>,
585    priority: u128,
586    num_reqs: usize,
587    num_bytes: u64,
588}
589
590#[derive(Debug, Clone, Copy)]
591pub struct SchedulerConfig {
592    /// the # of bytes that can be buffered but not yet requested.
593    /// This controls back pressure.  If data is not processed quickly enough then this
594    /// buffer will fill up and the I/O loop will pause until the buffer is drained.
595    pub io_buffer_size_bytes: u64,
596}
597
598impl SchedulerConfig {
599    /// Big enough for unit testing
600    pub fn default_for_testing() -> Self {
601        Self {
602            io_buffer_size_bytes: 256 * 1024 * 1024,
603        }
604    }
605
606    /// Configuration that should generally maximize bandwidth (not trying to save RAM
607    /// at all).  We assume a max page size of 32MiB and then allow 32MiB per I/O thread
608    pub fn max_bandwidth(store: &ObjectStore) -> Self {
609        Self {
610            io_buffer_size_bytes: 32 * 1024 * 1024 * store.io_parallelism() as u64,
611        }
612    }
613}
614
615impl ScanScheduler {
616    /// Create a new scheduler with the given I/O capacity
617    ///
618    /// # Arguments
619    ///
620    /// * object_store - the store to wrap
621    /// * config - configuration settings for the scheduler
622    pub fn new(object_store: Arc<ObjectStore>, config: SchedulerConfig) -> Arc<Self> {
623        let io_capacity = object_store.io_parallelism();
624        let io_queue = Arc::new(IoQueue::new(
625            io_capacity as u32,
626            config.io_buffer_size_bytes,
627        ));
628        let scheduler = Self {
629            object_store,
630            io_queue: io_queue.clone(),
631            stats: Arc::new(StatsCollector::new()),
632        };
633        tokio::task::spawn(async move { run_io_loop(io_queue).await });
634        Arc::new(scheduler)
635    }
636
637    /// Open a file for reading
638    ///
639    /// # Arguments
640    ///
641    /// * path - the path to the file to open
642    /// * base_priority - the base priority for I/O requests submitted to this file scheduler
643    ///   this will determine the upper 64 bits of priority (the lower 64 bits
644    ///   come from `submit_request` and `submit_single`)
645    pub async fn open_file_with_priority(
646        self: &Arc<Self>,
647        path: &Path,
648        base_priority: u64,
649        file_size_bytes: &CachedFileSize,
650    ) -> Result<FileScheduler> {
651        let file_size_bytes = if let Some(size) = file_size_bytes.get() {
652            u64::from(size) as usize
653        } else {
654            let size = self.object_store.size(path).await?;
655            if let Some(size) = NonZero::new(size as u64) {
656                file_size_bytes.set(size);
657            }
658            size
659        };
660        let reader = self
661            .object_store
662            .open_with_size(path, file_size_bytes)
663            .await?;
664        let block_size = self.object_store.block_size() as u64;
665        let max_iop_size = self.object_store.max_iop_size();
666        Ok(FileScheduler {
667            reader: reader.into(),
668            block_size,
669            root: self.clone(),
670            base_priority,
671            max_iop_size,
672        })
673    }
674
675    /// Open a file with a default priority of 0
676    ///
677    /// See [`Self::open_file_with_priority`] for more information on the priority
678    pub async fn open_file(
679        self: &Arc<Self>,
680        path: &Path,
681        file_size_bytes: &CachedFileSize,
682    ) -> Result<FileScheduler> {
683        self.open_file_with_priority(path, 0, file_size_bytes).await
684    }
685
686    fn do_submit_request(
687        &self,
688        reader: Arc<dyn Reader>,
689        request: Vec<Range<u64>>,
690        tx: oneshot::Sender<Response>,
691        priority: u128,
692    ) {
693        let num_iops = request.len() as u32;
694
695        let when_all_io_done = move |bytes_and_permits| {
696            // We don't care if the receiver has given up so discard the result
697            let _ = tx.send(bytes_and_permits);
698        };
699
700        let dest = Arc::new(Mutex::new(Box::new(MutableBatch::new(
701            when_all_io_done,
702            num_iops,
703            priority,
704            request.len(),
705        ))));
706
707        for (task_idx, iop) in request.into_iter().enumerate() {
708            let dest = dest.clone();
709            let io_queue = self.io_queue.clone();
710            let num_bytes = iop.end - iop.start;
711            let task = IoTask {
712                reader: reader.clone(),
713                to_read: iop,
714                priority,
715                when_done: Box::new(move |data| {
716                    io_queue.on_iop_complete();
717                    let mut dest = dest.lock().unwrap();
718                    let chunk = DataChunk {
719                        data,
720                        task_idx,
721                        num_bytes,
722                    };
723                    dest.deliver_data(chunk);
724                }),
725            };
726            self.io_queue.push(task);
727        }
728    }
729
730    fn submit_request(
731        &self,
732        reader: Arc<dyn Reader>,
733        request: Vec<Range<u64>>,
734        priority: u128,
735    ) -> impl Future<Output = Result<Vec<Bytes>>> + Send {
736        let (tx, rx) = oneshot::channel::<Response>();
737
738        self.do_submit_request(reader, request, tx, priority);
739
740        let io_queue = self.io_queue.clone();
741
742        rx.map(move |wrapped_rsp| {
743            // Right now, it isn't possible for I/O to be cancelled so a cancel error should
744            // not occur
745            let rsp = wrapped_rsp.unwrap();
746            io_queue.on_bytes_consumed(rsp.num_bytes, rsp.priority, rsp.num_reqs);
747            rsp.data
748        })
749    }
750
751    pub fn stats(&self) -> ScanStats {
752        ScanStats::new(self.stats.as_ref())
753    }
754}
755
756impl Drop for ScanScheduler {
757    fn drop(&mut self) {
758        self.io_queue.close();
759    }
760}
761
762/// A throttled file reader
763#[derive(Clone, Debug)]
764pub struct FileScheduler {
765    reader: Arc<dyn Reader>,
766    root: Arc<ScanScheduler>,
767    block_size: u64,
768    base_priority: u64,
769    max_iop_size: u64,
770}
771
772fn is_close_together(range1: &Range<u64>, range2: &Range<u64>, block_size: u64) -> bool {
773    // Note that range1.end <= range2.start is possible (e.g. when decoding string arrays)
774    range2.start <= (range1.end + block_size)
775}
776
777fn is_overlapping(range1: &Range<u64>, range2: &Range<u64>) -> bool {
778    range1.start < range2.end && range2.start < range1.end
779}
780
781impl FileScheduler {
782    /// Submit a batch of I/O requests to the reader
783    ///
784    /// The requests will be queued in a FIFO manner and, when all requests
785    /// have been fulfilled, the returned future will be completed.
786    ///
787    /// Each request has a given priority.  If the I/O loop is full then requests
788    /// will be buffered and requests with the *lowest* priority will be released
789    /// from the buffer first.
790    ///
791    /// Each request has a backpressure ID which controls which backpressure throttle
792    /// is applied to the request.  Requests made to the same backpressure throttle
793    /// will be throttled together.
794    pub fn submit_request(
795        &self,
796        request: Vec<Range<u64>>,
797        priority: u64,
798    ) -> impl Future<Output = Result<Vec<Bytes>>> + Send {
799        // The final priority is a combination of the row offset and the file number
800        let priority = ((self.base_priority as u128) << 64) + priority as u128;
801
802        let mut merged_requests = Vec::with_capacity(request.len());
803
804        if !request.is_empty() {
805            let mut curr_interval = request[0].clone();
806
807            for req in request.iter().skip(1) {
808                if is_close_together(&curr_interval, req, self.block_size) {
809                    curr_interval.end = curr_interval.end.max(req.end);
810                } else {
811                    merged_requests.push(curr_interval);
812                    curr_interval = req.clone();
813                }
814            }
815
816            merged_requests.push(curr_interval);
817        }
818
819        let mut updated_requests = Vec::with_capacity(merged_requests.len());
820        for req in merged_requests {
821            if req.is_empty() {
822                updated_requests.push(req);
823            } else {
824                let num_requests = (req.end - req.start).div_ceil(self.max_iop_size);
825                let bytes_per_request = (req.end - req.start) / num_requests;
826                for i in 0..num_requests {
827                    let start = req.start + i * bytes_per_request;
828                    let end = if i == num_requests - 1 {
829                        // Last request is a bit bigger due to rounding
830                        req.end
831                    } else {
832                        start + bytes_per_request
833                    };
834                    updated_requests.push(start..end);
835                }
836            }
837        }
838
839        self.root.stats.record_request(&updated_requests);
840
841        let bytes_vec_fut =
842            self.root
843                .submit_request(self.reader.clone(), updated_requests.clone(), priority);
844
845        let mut updated_index = 0;
846        let mut final_bytes = Vec::with_capacity(request.len());
847
848        async move {
849            let bytes_vec = bytes_vec_fut.await?;
850
851            let mut orig_index = 0;
852            while (updated_index < updated_requests.len()) && (orig_index < request.len()) {
853                let updated_range = &updated_requests[updated_index];
854                let orig_range = &request[orig_index];
855                let byte_offset = updated_range.start as usize;
856
857                if is_overlapping(updated_range, orig_range) {
858                    // We need to undo the coalescing and splitting done earlier
859                    let start = orig_range.start as usize - byte_offset;
860                    if orig_range.end <= updated_range.end {
861                        // The original range is fully contained in the updated range, can do
862                        // zero-copy slice
863                        let end = orig_range.end as usize - byte_offset;
864                        final_bytes.push(bytes_vec[updated_index].slice(start..end));
865                    } else {
866                        // The original read was split into multiple requests, need to copy
867                        // back into a single buffer
868                        let orig_size = orig_range.end - orig_range.start;
869                        let mut merged_bytes = Vec::with_capacity(orig_size as usize);
870                        merged_bytes.extend_from_slice(&bytes_vec[updated_index].slice(start..));
871                        let mut copy_offset = merged_bytes.len() as u64;
872                        while copy_offset < orig_size {
873                            updated_index += 1;
874                            let next_range = &updated_requests[updated_index];
875                            let bytes_to_take =
876                                (orig_size - copy_offset).min(next_range.end - next_range.start);
877                            merged_bytes.extend_from_slice(
878                                &bytes_vec[updated_index].slice(0..bytes_to_take as usize),
879                            );
880                            copy_offset += bytes_to_take;
881                        }
882                        final_bytes.push(Bytes::from(merged_bytes));
883                    }
884                    orig_index += 1;
885                } else {
886                    updated_index += 1;
887                }
888            }
889
890            Ok(final_bytes)
891        }
892    }
893
894    pub fn with_priority(&self, priority: u64) -> Self {
895        Self {
896            reader: self.reader.clone(),
897            root: self.root.clone(),
898            block_size: self.block_size,
899            max_iop_size: self.max_iop_size,
900            base_priority: priority,
901        }
902    }
903
904    /// Submit a single IOP to the reader
905    ///
906    /// If you have multiple IOPS to perform then [`Self::submit_request`] is going
907    /// to be more efficient.
908    ///
909    /// See [`Self::submit_request`] for more information on the priority and backpressure.
910    pub fn submit_single(
911        &self,
912        range: Range<u64>,
913        priority: u64,
914    ) -> impl Future<Output = Result<Bytes>> + Send {
915        self.submit_request(vec![range], priority)
916            .map_ok(|vec_bytes| vec_bytes.into_iter().next().unwrap())
917    }
918
919    /// Provides access to the underlying reader
920    ///
921    /// Do not use this for reading data as it will bypass any I/O scheduling!
922    /// This is mainly exposed to allow metadata operations (e.g size, block_size,)
923    /// which either aren't IOPS or we don't throttle
924    pub fn reader(&self) -> &Arc<dyn Reader> {
925        &self.reader
926    }
927}
928
929#[cfg(test)]
930mod tests {
931    use std::{collections::VecDeque, time::Duration};
932
933    use futures::poll;
934    use rand::RngCore;
935    use tempfile::tempdir;
936
937    use object_store::{memory::InMemory, GetRange, ObjectStore as OSObjectStore};
938    use tokio::{runtime::Handle, time::timeout};
939    use url::Url;
940
941    use crate::{
942        object_store::{DEFAULT_DOWNLOAD_RETRY_COUNT, DEFAULT_MAX_IOP_SIZE},
943        testing::MockObjectStore,
944    };
945
946    use super::*;
947
948    #[tokio::test]
949    async fn test_full_seq_read() {
950        let tmpdir = tempdir().unwrap();
951        let tmp_path = tmpdir.path().to_str().unwrap();
952        let tmp_path = Path::parse(tmp_path).unwrap();
953        let tmp_file = tmp_path.child("foo.file");
954
955        let obj_store = Arc::new(ObjectStore::local());
956
957        // Write 1MiB of data
958        const DATA_SIZE: u64 = 1024 * 1024;
959        let mut some_data = vec![0; DATA_SIZE as usize];
960        rand::thread_rng().fill_bytes(&mut some_data);
961        obj_store.put(&tmp_file, &some_data).await.unwrap();
962
963        let config = SchedulerConfig::default_for_testing();
964
965        let scheduler = ScanScheduler::new(obj_store, config);
966
967        let file_scheduler = scheduler
968            .open_file(&tmp_file, &CachedFileSize::unknown())
969            .await
970            .unwrap();
971
972        // Read it back 4KiB at a time
973        const READ_SIZE: u64 = 4 * 1024;
974        let mut reqs = VecDeque::new();
975        let mut offset = 0;
976        while offset < DATA_SIZE {
977            reqs.push_back(
978                #[allow(clippy::single_range_in_vec_init)]
979                file_scheduler
980                    .submit_request(vec![offset..offset + READ_SIZE], 0)
981                    .await
982                    .unwrap(),
983            );
984            offset += READ_SIZE;
985        }
986
987        offset = 0;
988        // Note: we should get parallel I/O even though we are consuming serially
989        while offset < DATA_SIZE {
990            let data = reqs.pop_front().unwrap();
991            let actual = &data[0];
992            let expected = &some_data[offset as usize..(offset + READ_SIZE) as usize];
993            assert_eq!(expected, actual);
994            offset += READ_SIZE;
995        }
996    }
997
998    #[tokio::test]
999    async fn test_split_coalesce() {
1000        let tmpdir = tempdir().unwrap();
1001        let tmp_path = tmpdir.path().to_str().unwrap();
1002        let tmp_path = Path::parse(tmp_path).unwrap();
1003        let tmp_file = tmp_path.child("foo.file");
1004
1005        let obj_store = Arc::new(ObjectStore::local());
1006
1007        // Write 75MiB of data
1008        const DATA_SIZE: u64 = 75 * 1024 * 1024;
1009        let mut some_data = vec![0; DATA_SIZE as usize];
1010        rand::thread_rng().fill_bytes(&mut some_data);
1011        obj_store.put(&tmp_file, &some_data).await.unwrap();
1012
1013        let config = SchedulerConfig::default_for_testing();
1014
1015        let scheduler = ScanScheduler::new(obj_store, config);
1016
1017        let file_scheduler = scheduler
1018            .open_file(&tmp_file, &CachedFileSize::unknown())
1019            .await
1020            .unwrap();
1021
1022        // These 3 requests should be coalesced into a single I/O because they are within 4KiB
1023        // of each other
1024        let req =
1025            file_scheduler.submit_request(vec![50_000..51_000, 52_000..53_000, 54_000..55_000], 0);
1026
1027        let bytes = req.await.unwrap();
1028
1029        assert_eq!(bytes[0], &some_data[50_000..51_000]);
1030        assert_eq!(bytes[1], &some_data[52_000..53_000]);
1031        assert_eq!(bytes[2], &some_data[54_000..55_000]);
1032
1033        assert_eq!(1, scheduler.stats().iops);
1034
1035        // This should be split into 5 requests because it is so large
1036        let req = file_scheduler.submit_request(vec![0..DATA_SIZE], 0);
1037        let bytes = req.await.unwrap();
1038        assert!(bytes[0] == some_data, "data is not the same");
1039
1040        assert_eq!(6, scheduler.stats().iops);
1041
1042        // None of these requests are bigger than the max IOP size but they will be coalesced into
1043        // one IOP that is bigger and then split back into 2 requests that don't quite align with the original
1044        // ranges.
1045        let chunk_size = *DEFAULT_MAX_IOP_SIZE;
1046        let req = file_scheduler.submit_request(
1047            vec![
1048                10..chunk_size,
1049                chunk_size + 10..(chunk_size * 2) - 20,
1050                chunk_size * 2..(chunk_size * 2) + 10,
1051            ],
1052            0,
1053        );
1054
1055        let bytes = req.await.unwrap();
1056        let chunk_size = chunk_size as usize;
1057        assert!(
1058            bytes[0] == some_data[10..chunk_size],
1059            "data is not the same"
1060        );
1061        assert!(
1062            bytes[1] == some_data[chunk_size + 10..(chunk_size * 2) - 20],
1063            "data is not the same"
1064        );
1065        assert!(
1066            bytes[2] == some_data[chunk_size * 2..(chunk_size * 2) + 10],
1067            "data is not the same"
1068        );
1069        assert_eq!(8, scheduler.stats().iops);
1070
1071        let reads = (0..44)
1072            .map(|i| (i * 1_000_000..(i + 1) * 1_000_000))
1073            .collect::<Vec<_>>();
1074        let req = file_scheduler.submit_request(reads, 0);
1075        let bytes = req.await.unwrap();
1076        for (i, bytes) in bytes.iter().enumerate() {
1077            assert!(
1078                bytes == &some_data[i * 1_000_000..(i + 1) * 1_000_000],
1079                "data is not the same"
1080            );
1081        }
1082        assert_eq!(11, scheduler.stats().iops);
1083    }
1084
1085    #[tokio::test]
1086    async fn test_priority() {
1087        let some_path = Path::parse("foo").unwrap();
1088        let base_store = Arc::new(InMemory::new());
1089        base_store
1090            .put(&some_path, vec![0; 1000].into())
1091            .await
1092            .unwrap();
1093
1094        let semaphore = Arc::new(tokio::sync::Semaphore::new(0));
1095        let mut obj_store = MockObjectStore::default();
1096        let semaphore_copy = semaphore.clone();
1097        obj_store
1098            .expect_get_opts()
1099            .returning(move |location, options| {
1100                let semaphore = semaphore.clone();
1101                let base_store = base_store.clone();
1102                let location = location.clone();
1103                async move {
1104                    semaphore.acquire().await.unwrap().forget();
1105                    base_store.get_opts(&location, options).await
1106                }
1107                .boxed()
1108            });
1109        let obj_store = Arc::new(ObjectStore::new(
1110            Arc::new(obj_store),
1111            Url::parse("mem://").unwrap(),
1112            Some(500),
1113            None,
1114            false,
1115            false,
1116            1,
1117            DEFAULT_DOWNLOAD_RETRY_COUNT,
1118        ));
1119
1120        let config = SchedulerConfig {
1121            io_buffer_size_bytes: 1024 * 1024,
1122        };
1123
1124        let scan_scheduler = ScanScheduler::new(obj_store, config);
1125
1126        let file_scheduler = scan_scheduler
1127            .open_file(&Path::parse("foo").unwrap(), &CachedFileSize::new(1000))
1128            .await
1129            .unwrap();
1130
1131        // Issue a request, priority doesn't matter, it will be submitted
1132        // immediately (it will go pending)
1133        // Note: the timeout is to prevent a deadlock if the test fails.
1134        let first_fut = timeout(
1135            Duration::from_secs(10),
1136            file_scheduler.submit_single(0..10, 0),
1137        )
1138        .boxed();
1139
1140        // Issue another low priority request (it will go in queue)
1141        let mut second_fut = timeout(
1142            Duration::from_secs(10),
1143            file_scheduler.submit_single(0..20, 100),
1144        )
1145        .boxed();
1146
1147        // Issue a high priority request (it will go in queue and should bump
1148        // the other queued request down)
1149        let mut third_fut = timeout(
1150            Duration::from_secs(10),
1151            file_scheduler.submit_single(0..30, 0),
1152        )
1153        .boxed();
1154
1155        // Finish one file, should be the in-flight first request
1156        semaphore_copy.add_permits(1);
1157        assert!(first_fut.await.unwrap().unwrap().len() == 10);
1158        // Other requests should not be finished
1159        assert!(poll!(&mut second_fut).is_pending());
1160        assert!(poll!(&mut third_fut).is_pending());
1161
1162        // Next should be high priority request
1163        semaphore_copy.add_permits(1);
1164        assert!(third_fut.await.unwrap().unwrap().len() == 30);
1165        assert!(poll!(&mut second_fut).is_pending());
1166
1167        // Finally, the low priority request
1168        semaphore_copy.add_permits(1);
1169        assert!(second_fut.await.unwrap().unwrap().len() == 20);
1170    }
1171
1172    #[tokio::test(flavor = "multi_thread")]
1173    async fn test_backpressure() {
1174        let some_path = Path::parse("foo").unwrap();
1175        let base_store = Arc::new(InMemory::new());
1176        base_store
1177            .put(&some_path, vec![0; 100000].into())
1178            .await
1179            .unwrap();
1180
1181        let bytes_read = Arc::new(AtomicU64::from(0));
1182        let mut obj_store = MockObjectStore::default();
1183        let bytes_read_copy = bytes_read.clone();
1184        // Wraps the obj_store to keep track of how many bytes have been read
1185        obj_store
1186            .expect_get_opts()
1187            .returning(move |location, options| {
1188                let range = options.range.as_ref().unwrap();
1189                let num_bytes = match range {
1190                    GetRange::Bounded(bounded) => bounded.end - bounded.start,
1191                    _ => panic!(),
1192                };
1193                bytes_read_copy.fetch_add(num_bytes as u64, Ordering::Release);
1194                let location = location.clone();
1195                let base_store = base_store.clone();
1196                async move { base_store.get_opts(&location, options).await }.boxed()
1197            });
1198        let obj_store = Arc::new(ObjectStore::new(
1199            Arc::new(obj_store),
1200            Url::parse("mem://").unwrap(),
1201            Some(500),
1202            None,
1203            false,
1204            false,
1205            1,
1206            DEFAULT_DOWNLOAD_RETRY_COUNT,
1207        ));
1208
1209        let config = SchedulerConfig {
1210            io_buffer_size_bytes: 10,
1211        };
1212
1213        let scan_scheduler = ScanScheduler::new(obj_store.clone(), config);
1214
1215        let file_scheduler = scan_scheduler
1216            .open_file(&Path::parse("foo").unwrap(), &CachedFileSize::new(100000))
1217            .await
1218            .unwrap();
1219
1220        let wait_for_idle = || async move {
1221            let handle = Handle::current();
1222            while handle.metrics().num_alive_tasks() != 1 {
1223                tokio::time::sleep(Duration::from_millis(10)).await;
1224            }
1225        };
1226        let wait_for_bytes_read_and_idle = |target_bytes: u64| {
1227            // We need to move `target` but don't want to move `bytes_read`
1228            let bytes_read = &bytes_read;
1229            async move {
1230                let bytes_read_copy = bytes_read.clone();
1231                while bytes_read_copy.load(Ordering::Acquire) < target_bytes {
1232                    tokio::time::sleep(Duration::from_millis(10)).await;
1233                }
1234                wait_for_idle().await;
1235            }
1236        };
1237
1238        // This read will begin immediately
1239        let first_fut = file_scheduler.submit_single(0..5, 0);
1240        // This read should also begin immediately
1241        let second_fut = file_scheduler.submit_single(0..5, 0);
1242        // This read will be throttled
1243        let third_fut = file_scheduler.submit_single(0..3, 0);
1244        // Two tasks (third_fut and unit test)
1245        wait_for_bytes_read_and_idle(10).await;
1246
1247        assert_eq!(first_fut.await.unwrap().len(), 5);
1248        // One task (unit test)
1249        wait_for_bytes_read_and_idle(13).await;
1250
1251        // 2 bytes are ready but 5 bytes requested, read will be blocked
1252        let fourth_fut = file_scheduler.submit_single(0..5, 0);
1253        wait_for_bytes_read_and_idle(13).await;
1254
1255        // Out of order completion is ok, will unblock backpressure
1256        assert_eq!(third_fut.await.unwrap().len(), 3);
1257        wait_for_bytes_read_and_idle(18).await;
1258
1259        assert_eq!(second_fut.await.unwrap().len(), 5);
1260        // At this point there are 5 bytes available in backpressure queue
1261        // Now we issue multi-read that can be partially fulfilled, it will read some bytes but
1262        // not all of them. (using large range gap to ensure request not coalesced)
1263        //
1264        // I'm actually not sure this behavior is great.  It's possible that we should just
1265        // block until we can fulfill the entire request.
1266        let fifth_fut = file_scheduler.submit_request(vec![0..3, 90000..90007], 0);
1267        wait_for_bytes_read_and_idle(21).await;
1268
1269        // Fifth future should eventually finish due to deadlock prevention
1270        let fifth_bytes = tokio::time::timeout(Duration::from_secs(10), fifth_fut)
1271            .await
1272            .unwrap();
1273        assert_eq!(
1274            fifth_bytes.unwrap().iter().map(|b| b.len()).sum::<usize>(),
1275            10
1276        );
1277
1278        // And now let's just make sure that we can read the rest of the data
1279        assert_eq!(fourth_fut.await.unwrap().len(), 5);
1280        wait_for_bytes_read_and_idle(28).await;
1281
1282        // Ensure deadlock prevention timeout can be disabled
1283        let config = SchedulerConfig {
1284            io_buffer_size_bytes: 10,
1285        };
1286
1287        let scan_scheduler = ScanScheduler::new(obj_store, config);
1288        let file_scheduler = scan_scheduler
1289            .open_file(&Path::parse("foo").unwrap(), &CachedFileSize::new(100000))
1290            .await
1291            .unwrap();
1292
1293        let first_fut = file_scheduler.submit_single(0..10, 0);
1294        let second_fut = file_scheduler.submit_single(0..10, 0);
1295
1296        std::thread::sleep(Duration::from_millis(100));
1297        assert_eq!(first_fut.await.unwrap().len(), 10);
1298        assert_eq!(second_fut.await.unwrap().len(), 10);
1299    }
1300
1301    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1302    async fn stress_backpressure() {
1303        // This test ensures that the backpressure mechanism works correctly with
1304        // regards to priority.  In other words, as long as all requests are consumed
1305        // in priority order then the backpressure mechanism should not deadlock
1306        let some_path = Path::parse("foo").unwrap();
1307        let obj_store = Arc::new(ObjectStore::memory());
1308        obj_store
1309            .put(&some_path, vec![0; 100000].as_slice())
1310            .await
1311            .unwrap();
1312
1313        // Only one request will be allowed in
1314        let config = SchedulerConfig {
1315            io_buffer_size_bytes: 1,
1316        };
1317        let scan_scheduler = ScanScheduler::new(obj_store.clone(), config);
1318        let file_scheduler = scan_scheduler
1319            .open_file(&some_path, &CachedFileSize::unknown())
1320            .await
1321            .unwrap();
1322
1323        let mut futs = Vec::with_capacity(10000);
1324        for idx in 0..10000 {
1325            futs.push(file_scheduler.submit_single(idx..idx + 1, idx));
1326        }
1327
1328        for fut in futs {
1329            fut.await.unwrap();
1330        }
1331    }
1332}