lance_io/
scheduler.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use bytes::Bytes;
5use futures::channel::oneshot;
6use futures::{FutureExt, TryFutureExt};
7use object_store::path::Path;
8use snafu::location;
9use std::collections::BinaryHeap;
10use std::fmt::Debug;
11use std::future::Future;
12use std::num::NonZero;
13use std::ops::Range;
14use std::sync::atomic::{AtomicU64, Ordering};
15use std::sync::{Arc, Mutex};
16use std::time::Instant;
17use tokio::sync::{Notify, Semaphore, SemaphorePermit};
18
19use lance_core::{Error, Result};
20
21use crate::object_store::ObjectStore;
22use crate::traits::Reader;
23use crate::utils::CachedFileSize;
24
25// Don't log backpressure warnings until at least this many seconds have passed
26const BACKPRESSURE_MIN: u64 = 5;
27// Don't log backpressure warnings more than once / minute
28const BACKPRESSURE_DEBOUNCE: u64 = 60;
29
30// Global counter of how many IOPS we have issued
31static IOPS_COUNTER: AtomicU64 = AtomicU64::new(0);
32// Global counter of how many bytes were read by the scheduler
33static BYTES_READ_COUNTER: AtomicU64 = AtomicU64::new(0);
34// By default, we limit the number of IOPS across the entire process to 128
35//
36// In theory this is enough for ~10GBps on S3 following the guidelines to issue
37// 1 IOP per 80MBps.  In practice, I have noticed slightly better performance going
38// up to 256.
39//
40// However, non-S3 stores (e.g. GCS, Azure) can suffer significantly from too many
41// concurrent IOPS.  For safety, we set the default to 128 and let the user override
42// this if needed.
43//
44// Note: this only limits things that run through the scheduler.  It does not limit
45// IOPS from other sources like writing or commits.
46static DEFAULT_PROCESS_IOPS_LIMIT: i32 = 128;
47
48pub fn iops_counter() -> u64 {
49    IOPS_COUNTER.load(Ordering::Acquire)
50}
51
52pub fn bytes_read_counter() -> u64 {
53    BYTES_READ_COUNTER.load(Ordering::Acquire)
54}
55
56// There are two structures that control the I/O scheduler concurrency.  First,
57// we have a hard limit on the number of IOPS that can be issued concurrently.
58// This limit is process-wide.
59//
60// Second, we try and limit how many I/O requests can be buffered in memory without
61// being consumed by a decoder of some kind.  This limit is per-scheduler.  We cannot
62// make this limit process wide without introducing deadlock (because the decoder for
63// file 0 might be waiting on IOPS blocked by a queue filled with requests for file 1)
64// and vice-versa.
65//
66// There is also a per-scan limit on the number of IOPS that can be issued concurrently.
67//
68// The process-wide limit exists when users need a hard limit on the number of parallel
69// IOPS, e.g. due to port availability limits or to prevent multiple scans from saturating
70// the network.  (Note: a process-wide limit of X will not necessarily limit the number of
71// open TCP connections to exactly X.  The underlying object store may open more connections
72// anyways)
73//
74// However, it can be too tough in some cases, e.g. when some scans are reading from
75// cloud storage and other scans are reading from local disk.  In these cases users don't
76// need to set a process-limit and can rely on the per-scan limits.
77
78// The IopsQuota enforces the first of the above limits, it is the per-process hard cap
79// on the number of IOPS that can be issued concurrently.
80//
81// The per-scan limits are enforced by IoQueue
82struct IopsQuota {
83    // An Option is used here to avoid mutex overhead if no limit is set
84    iops_avail: Option<Semaphore>,
85}
86
87/// A reservation on the global IOPS quota
88///
89/// When the reservation is dropped, the IOPS quota is released unless
90/// [`Self::forget`] is called.
91struct IopsReservation<'a> {
92    value: Option<SemaphorePermit<'a>>,
93}
94
95impl IopsReservation<'_> {
96    // Forget the reservation, so it won't be released on drop
97    fn forget(&mut self) {
98        if let Some(value) = self.value.take() {
99            value.forget();
100        }
101    }
102}
103
104impl IopsQuota {
105    // By default, we throttle the number of scan IOPS across the entire process
106    //
107    // However, the user can disable this by setting the environment variable
108    // LANCE_PROCESS_IO_THREADS_LIMIT to zero (or a negative integer).
109    fn new() -> Self {
110        let initial_capacity = std::env::var("LANCE_PROCESS_IO_THREADS_LIMIT")
111            .map(|s| {
112                s.parse::<i32>().unwrap_or_else(|_| {
113                    log::warn!("Ignoring invalid LANCE_PROCESS_IO_THREADS_LIMIT: {}", s);
114                    DEFAULT_PROCESS_IOPS_LIMIT
115                })
116            })
117            .unwrap_or(DEFAULT_PROCESS_IOPS_LIMIT);
118        let iops_avail = if initial_capacity <= 0 {
119            None
120        } else {
121            Some(Semaphore::new(initial_capacity as usize))
122        };
123        Self { iops_avail }
124    }
125
126    // Return a reservation on the global IOPS quota
127    fn release(&self) {
128        if let Some(iops_avail) = self.iops_avail.as_ref() {
129            iops_avail.add_permits(1);
130        }
131    }
132
133    // Acquire a reservation on the global IOPS quota
134    async fn acquire(&self) -> IopsReservation<'_> {
135        if let Some(iops_avail) = self.iops_avail.as_ref() {
136            IopsReservation {
137                value: Some(iops_avail.acquire().await.unwrap()),
138            }
139        } else {
140            IopsReservation { value: None }
141        }
142    }
143}
144
145static IOPS_QUOTA: std::sync::LazyLock<IopsQuota> = std::sync::LazyLock::new(IopsQuota::new);
146
147// We want to allow requests that have a lower priority than any
148// currently in-flight request.  This helps avoid potential deadlocks
149// related to backpressure.  Unfortunately, it is quite expensive to
150// keep track of which priorities are in-flight.
151//
152// TODO: At some point it would be nice if we can optimize this away but
153// in_flight should remain relatively small (generally less than 256 items)
154// and has not shown itself to be a bottleneck yet.
155struct PrioritiesInFlight {
156    in_flight: Vec<u128>,
157}
158
159impl PrioritiesInFlight {
160    fn new(capacity: u32) -> Self {
161        Self {
162            in_flight: Vec::with_capacity(capacity as usize * 2),
163        }
164    }
165
166    fn min_in_flight(&self) -> u128 {
167        self.in_flight.first().copied().unwrap_or(u128::MAX)
168    }
169
170    fn push(&mut self, prio: u128) {
171        let pos = match self.in_flight.binary_search(&prio) {
172            Ok(pos) => pos,
173            Err(pos) => pos,
174        };
175        self.in_flight.insert(pos, prio);
176    }
177
178    fn remove(&mut self, prio: u128) {
179        if let Ok(pos) = self.in_flight.binary_search(&prio) {
180            self.in_flight.remove(pos);
181        } else {
182            unreachable!();
183        }
184    }
185}
186
187struct IoQueueState {
188    // Number of IOPS we can issue concurrently before pausing I/O
189    iops_avail: u32,
190    // Number of bytes we are allowed to buffer in memory before pausing I/O
191    //
192    // This can dip below 0 due to I/O prioritization
193    bytes_avail: i64,
194    // Pending I/O requests
195    pending_requests: BinaryHeap<IoTask>,
196    // Priorities of in-flight requests
197    priorities_in_flight: PrioritiesInFlight,
198    // Set when the scheduler is finished to notify the I/O loop to shut down
199    // once all outstanding requests have been completed.
200    done_scheduling: bool,
201    // Time when the scheduler started
202    start: Instant,
203    // Last time we warned about backpressure
204    last_warn: AtomicU64,
205}
206
207impl IoQueueState {
208    fn new(io_capacity: u32, io_buffer_size: u64) -> Self {
209        Self {
210            iops_avail: io_capacity,
211            bytes_avail: io_buffer_size as i64,
212            pending_requests: BinaryHeap::new(),
213            priorities_in_flight: PrioritiesInFlight::new(io_capacity),
214            done_scheduling: false,
215            start: Instant::now(),
216            last_warn: AtomicU64::from(0),
217        }
218    }
219
220    fn finished(&self) -> bool {
221        self.done_scheduling && self.pending_requests.is_empty()
222    }
223
224    fn warn_if_needed(&self) {
225        let seconds_elapsed = self.start.elapsed().as_secs();
226        let last_warn = self.last_warn.load(Ordering::Acquire);
227        let since_last_warn = seconds_elapsed - last_warn;
228        if (last_warn == 0
229            && seconds_elapsed > BACKPRESSURE_MIN
230            && seconds_elapsed < BACKPRESSURE_DEBOUNCE)
231            || since_last_warn > BACKPRESSURE_DEBOUNCE
232        {
233            tracing::event!(tracing::Level::DEBUG, "Backpressure throttle exceeded");
234            log::debug!("Backpressure throttle is full, I/O will pause until buffer is drained.  Max I/O bandwidth will not be achieved because CPU is falling behind");
235            self.last_warn
236                .store(seconds_elapsed.max(1), Ordering::Release);
237        }
238    }
239
240    fn can_deliver(&self, task: &IoTask) -> bool {
241        if self.iops_avail == 0 {
242            false
243        } else if task.priority <= self.priorities_in_flight.min_in_flight() {
244            true
245        } else if task.num_bytes() as i64 > self.bytes_avail {
246            self.warn_if_needed();
247            false
248        } else {
249            true
250        }
251    }
252
253    fn next_task(&mut self) -> Option<IoTask> {
254        let task = self.pending_requests.peek()?;
255        if self.can_deliver(task) {
256            self.priorities_in_flight.push(task.priority);
257            self.iops_avail -= 1;
258            self.bytes_avail -= task.num_bytes() as i64;
259            if self.bytes_avail < 0 {
260                // This can happen when we admit special priority requests
261                log::debug!(
262                    "Backpressure throttle temporarily exceeded by {} bytes due to priority I/O",
263                    -self.bytes_avail
264                );
265            }
266            Some(self.pending_requests.pop().unwrap())
267        } else {
268            None
269        }
270    }
271}
272
273// This is modeled after the MPSC queue described here: https://docs.rs/tokio/latest/tokio/sync/struct.Notify.html
274//
275// However, it only needs to be SPSC since there is only one "scheduler thread"
276// and one I/O loop.
277struct IoQueue {
278    // Queue state
279    state: Mutex<IoQueueState>,
280    // Used to signal new I/O requests have arrived that might potentially be runnable
281    notify: Notify,
282}
283
284impl IoQueue {
285    fn new(io_capacity: u32, io_buffer_size: u64) -> Self {
286        Self {
287            state: Mutex::new(IoQueueState::new(io_capacity, io_buffer_size)),
288            notify: Notify::new(),
289        }
290    }
291
292    fn push(&self, task: IoTask) {
293        log::trace!(
294            "Inserting I/O request for {} bytes with priority ({},{}) into I/O queue",
295            task.num_bytes(),
296            task.priority >> 64,
297            task.priority & 0xFFFFFFFFFFFFFFFF
298        );
299        let mut state = self.state.lock().unwrap();
300        state.pending_requests.push(task);
301        drop(state);
302
303        self.notify.notify_one();
304    }
305
306    async fn pop(&self) -> Option<IoTask> {
307        loop {
308            {
309                // First, grab a reservation on the global IOPS quota
310                // If we then get a task to run, transfer the reservation
311                // to the task.  Otherwise, the reservation will be released
312                // when iop_res is dropped.
313                let mut iop_res = IOPS_QUOTA.acquire().await;
314                // Next, try and grab a reservation from the queue
315                let mut state = self.state.lock().unwrap();
316                if let Some(task) = state.next_task() {
317                    // Reservation successfully acquired, we will release the global
318                    // global reservation after task has run.
319                    iop_res.forget();
320                    return Some(task);
321                }
322
323                if state.finished() {
324                    return None;
325                }
326            }
327
328            self.notify.notified().await;
329        }
330    }
331
332    fn on_iop_complete(&self) {
333        let mut state = self.state.lock().unwrap();
334        state.iops_avail += 1;
335        drop(state);
336
337        self.notify.notify_one();
338    }
339
340    fn on_bytes_consumed(&self, bytes: u64, priority: u128, num_reqs: usize) {
341        let mut state = self.state.lock().unwrap();
342        state.bytes_avail += bytes as i64;
343        for _ in 0..num_reqs {
344            state.priorities_in_flight.remove(priority);
345        }
346        drop(state);
347
348        self.notify.notify_one();
349    }
350
351    fn close(&self) {
352        let mut state = self.state.lock().unwrap();
353        state.done_scheduling = true;
354        drop(state);
355
356        self.notify.notify_one();
357    }
358}
359
360// There is one instance of MutableBatch shared by all the I/O operations
361// that make up a single request.  When all the I/O operations complete
362// then the MutableBatch goes out of scope and the batch request is considered
363// complete
364struct MutableBatch<F: FnOnce(Response) + Send> {
365    when_done: Option<F>,
366    data_buffers: Vec<Bytes>,
367    num_bytes: u64,
368    priority: u128,
369    num_reqs: usize,
370    err: Option<Box<dyn std::error::Error + Send + Sync + 'static>>,
371}
372
373impl<F: FnOnce(Response) + Send> MutableBatch<F> {
374    fn new(when_done: F, num_data_buffers: u32, priority: u128, num_reqs: usize) -> Self {
375        Self {
376            when_done: Some(when_done),
377            data_buffers: vec![Bytes::default(); num_data_buffers as usize],
378            num_bytes: 0,
379            priority,
380            num_reqs,
381            err: None,
382        }
383    }
384}
385
386// Rather than keep track of when all the I/O requests are finished so that we
387// can deliver the batch of data we let Rust do that for us.  When all I/O's are
388// done then the MutableBatch will go out of scope and we know we have all the
389// data.
390impl<F: FnOnce(Response) + Send> Drop for MutableBatch<F> {
391    fn drop(&mut self) {
392        // If we have an error, return that.  Otherwise return the data
393        let result = if self.err.is_some() {
394            Err(Error::Wrapped {
395                error: self.err.take().unwrap(),
396                location: location!(),
397            })
398        } else {
399            let mut data = Vec::new();
400            std::mem::swap(&mut data, &mut self.data_buffers);
401            Ok(data)
402        };
403        // We don't really care if no one is around to receive it, just let
404        // the result go out of scope and get cleaned up
405        let response = Response {
406            data: result,
407            num_bytes: self.num_bytes,
408            priority: self.priority,
409            num_reqs: self.num_reqs,
410        };
411        (self.when_done.take().unwrap())(response);
412    }
413}
414
415struct DataChunk {
416    task_idx: usize,
417    num_bytes: u64,
418    data: Result<Bytes>,
419}
420
421trait DataSink: Send {
422    fn deliver_data(&mut self, data: DataChunk);
423}
424
425impl<F: FnOnce(Response) + Send> DataSink for MutableBatch<F> {
426    // Called by worker tasks to add data to the MutableBatch
427    fn deliver_data(&mut self, data: DataChunk) {
428        self.num_bytes += data.num_bytes;
429        match data.data {
430            Ok(data_bytes) => {
431                self.data_buffers[data.task_idx] = data_bytes;
432            }
433            Err(err) => {
434                // This keeps the original error, if present
435                self.err.get_or_insert(Box::new(err));
436            }
437        }
438    }
439}
440
441struct IoTask {
442    reader: Arc<dyn Reader>,
443    to_read: Range<u64>,
444    when_done: Box<dyn FnOnce(Result<Bytes>) + Send>,
445    priority: u128,
446}
447
448impl Eq for IoTask {}
449
450impl PartialEq for IoTask {
451    fn eq(&self, other: &Self) -> bool {
452        self.priority == other.priority
453    }
454}
455
456impl PartialOrd for IoTask {
457    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
458        Some(self.cmp(other))
459    }
460}
461
462impl Ord for IoTask {
463    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
464        // This is intentionally inverted.  We want a min-heap
465        other.priority.cmp(&self.priority)
466    }
467}
468
469impl IoTask {
470    fn num_bytes(&self) -> u64 {
471        self.to_read.end - self.to_read.start
472    }
473
474    async fn run(self) {
475        let file_path = self.reader.path().as_ref();
476        let num_bytes = self.num_bytes();
477        let bytes = if self.to_read.start == self.to_read.end {
478            Ok(Bytes::new())
479        } else {
480            let bytes_fut = self
481                .reader
482                .get_range(self.to_read.start as usize..self.to_read.end as usize);
483            IOPS_COUNTER.fetch_add(1, Ordering::Release);
484            let num_bytes = self.num_bytes();
485            bytes_fut
486                .inspect(move |_| {
487                    BYTES_READ_COUNTER.fetch_add(num_bytes, Ordering::Release);
488                })
489                .await
490                .map_err(Error::from)
491        };
492        // Emit per-file I/O trace event only when tracing is enabled
493        tracing::trace!(
494            file = file_path,
495            bytes_read = num_bytes,
496            requests = 1,
497            range_start = self.to_read.start,
498            range_end = self.to_read.end,
499            "File I/O completed"
500        );
501        IOPS_QUOTA.release();
502        (self.when_done)(bytes);
503    }
504}
505
506// Every time a scheduler starts up it launches a task to run the I/O loop.  This loop
507// repeats endlessly until the scheduler is destroyed.
508async fn run_io_loop(tasks: Arc<IoQueue>) {
509    // Pop the first finished task off the queue and submit another until
510    // we are done
511    loop {
512        let next_task = tasks.pop().await;
513        match next_task {
514            Some(task) => {
515                tokio::spawn(task.run());
516            }
517            None => {
518                // The sender has been dropped, we are done
519                return;
520            }
521        }
522    }
523}
524
525#[derive(Debug)]
526struct StatsCollector {
527    iops: AtomicU64,
528    requests: AtomicU64,
529    bytes_read: AtomicU64,
530}
531
532impl StatsCollector {
533    fn new() -> Self {
534        Self {
535            iops: AtomicU64::new(0),
536            requests: AtomicU64::new(0),
537            bytes_read: AtomicU64::new(0),
538        }
539    }
540
541    fn iops(&self) -> u64 {
542        self.iops.load(Ordering::Relaxed)
543    }
544
545    fn bytes_read(&self) -> u64 {
546        self.bytes_read.load(Ordering::Relaxed)
547    }
548
549    fn requests(&self) -> u64 {
550        self.requests.load(Ordering::Relaxed)
551    }
552
553    fn record_request(&self, request: &[Range<u64>]) {
554        self.requests.fetch_add(1, Ordering::Relaxed);
555        self.iops.fetch_add(request.len() as u64, Ordering::Relaxed);
556        self.bytes_read.fetch_add(
557            request.iter().map(|r| r.end - r.start).sum::<u64>(),
558            Ordering::Relaxed,
559        );
560    }
561}
562
563pub struct ScanStats {
564    pub iops: u64,
565    pub requests: u64,
566    pub bytes_read: u64,
567}
568
569impl ScanStats {
570    fn new(stats: &StatsCollector) -> Self {
571        Self {
572            iops: stats.iops(),
573            requests: stats.requests(),
574            bytes_read: stats.bytes_read(),
575        }
576    }
577}
578
579/// An I/O scheduler which wraps an ObjectStore and throttles the amount of
580/// parallel I/O that can be run.
581///
582/// TODO: This will also add coalescing
583pub struct ScanScheduler {
584    object_store: Arc<ObjectStore>,
585    io_queue: Arc<IoQueue>,
586    stats: Arc<StatsCollector>,
587}
588
589impl Debug for ScanScheduler {
590    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
591        f.debug_struct("ScanScheduler")
592            .field("object_store", &self.object_store)
593            .finish()
594    }
595}
596
597struct Response {
598    data: Result<Vec<Bytes>>,
599    priority: u128,
600    num_reqs: usize,
601    num_bytes: u64,
602}
603
604#[derive(Debug, Clone, Copy)]
605pub struct SchedulerConfig {
606    /// the # of bytes that can be buffered but not yet requested.
607    /// This controls back pressure.  If data is not processed quickly enough then this
608    /// buffer will fill up and the I/O loop will pause until the buffer is drained.
609    pub io_buffer_size_bytes: u64,
610}
611
612impl SchedulerConfig {
613    /// Big enough for unit testing
614    pub fn default_for_testing() -> Self {
615        Self {
616            io_buffer_size_bytes: 256 * 1024 * 1024,
617        }
618    }
619
620    /// Configuration that should generally maximize bandwidth (not trying to save RAM
621    /// at all).  We assume a max page size of 32MiB and then allow 32MiB per I/O thread
622    pub fn max_bandwidth(store: &ObjectStore) -> Self {
623        Self {
624            io_buffer_size_bytes: 32 * 1024 * 1024 * store.io_parallelism() as u64,
625        }
626    }
627}
628
629impl ScanScheduler {
630    /// Create a new scheduler with the given I/O capacity
631    ///
632    /// # Arguments
633    ///
634    /// * object_store - the store to wrap
635    /// * config - configuration settings for the scheduler
636    pub fn new(object_store: Arc<ObjectStore>, config: SchedulerConfig) -> Arc<Self> {
637        let io_capacity = object_store.io_parallelism();
638        let io_queue = Arc::new(IoQueue::new(
639            io_capacity as u32,
640            config.io_buffer_size_bytes,
641        ));
642        let scheduler = Self {
643            object_store,
644            io_queue: io_queue.clone(),
645            stats: Arc::new(StatsCollector::new()),
646        };
647        tokio::task::spawn(async move { run_io_loop(io_queue).await });
648        Arc::new(scheduler)
649    }
650
651    /// Open a file for reading
652    ///
653    /// # Arguments
654    ///
655    /// * path - the path to the file to open
656    /// * base_priority - the base priority for I/O requests submitted to this file scheduler
657    ///   this will determine the upper 64 bits of priority (the lower 64 bits
658    ///   come from `submit_request` and `submit_single`)
659    pub async fn open_file_with_priority(
660        self: &Arc<Self>,
661        path: &Path,
662        base_priority: u64,
663        file_size_bytes: &CachedFileSize,
664    ) -> Result<FileScheduler> {
665        let file_size_bytes = if let Some(size) = file_size_bytes.get() {
666            u64::from(size)
667        } else {
668            let size = self.object_store.size(path).await?;
669            if let Some(size) = NonZero::new(size) {
670                file_size_bytes.set(size);
671            }
672            size
673        };
674        let reader = self
675            .object_store
676            .open_with_size(path, file_size_bytes as usize)
677            .await?;
678        let block_size = self.object_store.block_size() as u64;
679        let max_iop_size = self.object_store.max_iop_size();
680        Ok(FileScheduler {
681            reader: reader.into(),
682            block_size,
683            root: self.clone(),
684            base_priority,
685            max_iop_size,
686        })
687    }
688
689    /// Open a file with a default priority of 0
690    ///
691    /// See [`Self::open_file_with_priority`] for more information on the priority
692    pub async fn open_file(
693        self: &Arc<Self>,
694        path: &Path,
695        file_size_bytes: &CachedFileSize,
696    ) -> Result<FileScheduler> {
697        self.open_file_with_priority(path, 0, file_size_bytes).await
698    }
699
700    fn do_submit_request(
701        &self,
702        reader: Arc<dyn Reader>,
703        request: Vec<Range<u64>>,
704        tx: oneshot::Sender<Response>,
705        priority: u128,
706    ) {
707        let num_iops = request.len() as u32;
708
709        let when_all_io_done = move |bytes_and_permits| {
710            // We don't care if the receiver has given up so discard the result
711            let _ = tx.send(bytes_and_permits);
712        };
713
714        let dest = Arc::new(Mutex::new(Box::new(MutableBatch::new(
715            when_all_io_done,
716            num_iops,
717            priority,
718            request.len(),
719        ))));
720
721        for (task_idx, iop) in request.into_iter().enumerate() {
722            let dest = dest.clone();
723            let io_queue = self.io_queue.clone();
724            let num_bytes = iop.end - iop.start;
725            let task = IoTask {
726                reader: reader.clone(),
727                to_read: iop,
728                priority,
729                when_done: Box::new(move |data| {
730                    io_queue.on_iop_complete();
731                    let mut dest = dest.lock().unwrap();
732                    let chunk = DataChunk {
733                        data,
734                        task_idx,
735                        num_bytes,
736                    };
737                    dest.deliver_data(chunk);
738                }),
739            };
740            self.io_queue.push(task);
741        }
742    }
743
744    fn submit_request(
745        &self,
746        reader: Arc<dyn Reader>,
747        request: Vec<Range<u64>>,
748        priority: u128,
749    ) -> impl Future<Output = Result<Vec<Bytes>>> + Send {
750        let (tx, rx) = oneshot::channel::<Response>();
751
752        self.do_submit_request(reader, request, tx, priority);
753
754        let io_queue = self.io_queue.clone();
755
756        rx.map(move |wrapped_rsp| {
757            // Right now, it isn't possible for I/O to be cancelled so a cancel error should
758            // not occur
759            let rsp = wrapped_rsp.unwrap();
760            io_queue.on_bytes_consumed(rsp.num_bytes, rsp.priority, rsp.num_reqs);
761            rsp.data
762        })
763    }
764
765    pub fn stats(&self) -> ScanStats {
766        ScanStats::new(self.stats.as_ref())
767    }
768}
769
770impl Drop for ScanScheduler {
771    fn drop(&mut self) {
772        self.io_queue.close();
773    }
774}
775
776/// A throttled file reader
777#[derive(Clone, Debug)]
778pub struct FileScheduler {
779    reader: Arc<dyn Reader>,
780    root: Arc<ScanScheduler>,
781    block_size: u64,
782    base_priority: u64,
783    max_iop_size: u64,
784}
785
786fn is_close_together(range1: &Range<u64>, range2: &Range<u64>, block_size: u64) -> bool {
787    // Note that range1.end <= range2.start is possible (e.g. when decoding string arrays)
788    range2.start <= (range1.end + block_size)
789}
790
791fn is_overlapping(range1: &Range<u64>, range2: &Range<u64>) -> bool {
792    range1.start < range2.end && range2.start < range1.end
793}
794
795impl FileScheduler {
796    /// Submit a batch of I/O requests to the reader
797    ///
798    /// The requests will be queued in a FIFO manner and, when all requests
799    /// have been fulfilled, the returned future will be completed.
800    ///
801    /// Each request has a given priority.  If the I/O loop is full then requests
802    /// will be buffered and requests with the *lowest* priority will be released
803    /// from the buffer first.
804    ///
805    /// Each request has a backpressure ID which controls which backpressure throttle
806    /// is applied to the request.  Requests made to the same backpressure throttle
807    /// will be throttled together.
808    pub fn submit_request(
809        &self,
810        request: Vec<Range<u64>>,
811        priority: u64,
812    ) -> impl Future<Output = Result<Vec<Bytes>>> + Send {
813        // The final priority is a combination of the row offset and the file number
814        let priority = ((self.base_priority as u128) << 64) + priority as u128;
815
816        let mut merged_requests = Vec::with_capacity(request.len());
817
818        if !request.is_empty() {
819            let mut curr_interval = request[0].clone();
820
821            for req in request.iter().skip(1) {
822                if is_close_together(&curr_interval, req, self.block_size) {
823                    curr_interval.end = curr_interval.end.max(req.end);
824                } else {
825                    merged_requests.push(curr_interval);
826                    curr_interval = req.clone();
827                }
828            }
829
830            merged_requests.push(curr_interval);
831        }
832
833        let mut updated_requests = Vec::with_capacity(merged_requests.len());
834        for req in merged_requests {
835            if req.is_empty() {
836                updated_requests.push(req);
837            } else {
838                let num_requests = (req.end - req.start).div_ceil(self.max_iop_size);
839                let bytes_per_request = (req.end - req.start) / num_requests;
840                for i in 0..num_requests {
841                    let start = req.start + i * bytes_per_request;
842                    let end = if i == num_requests - 1 {
843                        // Last request is a bit bigger due to rounding
844                        req.end
845                    } else {
846                        start + bytes_per_request
847                    };
848                    updated_requests.push(start..end);
849                }
850            }
851        }
852
853        self.root.stats.record_request(&updated_requests);
854
855        let bytes_vec_fut =
856            self.root
857                .submit_request(self.reader.clone(), updated_requests.clone(), priority);
858
859        let mut updated_index = 0;
860        let mut final_bytes = Vec::with_capacity(request.len());
861
862        async move {
863            let bytes_vec = bytes_vec_fut.await?;
864
865            let mut orig_index = 0;
866            while (updated_index < updated_requests.len()) && (orig_index < request.len()) {
867                let updated_range = &updated_requests[updated_index];
868                let orig_range = &request[orig_index];
869                let byte_offset = updated_range.start as usize;
870
871                if is_overlapping(updated_range, orig_range) {
872                    // We need to undo the coalescing and splitting done earlier
873                    let start = orig_range.start as usize - byte_offset;
874                    if orig_range.end <= updated_range.end {
875                        // The original range is fully contained in the updated range, can do
876                        // zero-copy slice
877                        let end = orig_range.end as usize - byte_offset;
878                        final_bytes.push(bytes_vec[updated_index].slice(start..end));
879                    } else {
880                        // The original read was split into multiple requests, need to copy
881                        // back into a single buffer
882                        let orig_size = orig_range.end - orig_range.start;
883                        let mut merged_bytes = Vec::with_capacity(orig_size as usize);
884                        merged_bytes.extend_from_slice(&bytes_vec[updated_index].slice(start..));
885                        let mut copy_offset = merged_bytes.len() as u64;
886                        while copy_offset < orig_size {
887                            updated_index += 1;
888                            let next_range = &updated_requests[updated_index];
889                            let bytes_to_take =
890                                (orig_size - copy_offset).min(next_range.end - next_range.start);
891                            merged_bytes.extend_from_slice(
892                                &bytes_vec[updated_index].slice(0..bytes_to_take as usize),
893                            );
894                            copy_offset += bytes_to_take;
895                        }
896                        final_bytes.push(Bytes::from(merged_bytes));
897                    }
898                    orig_index += 1;
899                } else {
900                    updated_index += 1;
901                }
902            }
903
904            Ok(final_bytes)
905        }
906    }
907
908    pub fn with_priority(&self, priority: u64) -> Self {
909        Self {
910            reader: self.reader.clone(),
911            root: self.root.clone(),
912            block_size: self.block_size,
913            max_iop_size: self.max_iop_size,
914            base_priority: priority,
915        }
916    }
917
918    /// Submit a single IOP to the reader
919    ///
920    /// If you have multiple IOPS to perform then [`Self::submit_request`] is going
921    /// to be more efficient.
922    ///
923    /// See [`Self::submit_request`] for more information on the priority and backpressure.
924    pub fn submit_single(
925        &self,
926        range: Range<u64>,
927        priority: u64,
928    ) -> impl Future<Output = Result<Bytes>> + Send {
929        self.submit_request(vec![range], priority)
930            .map_ok(|vec_bytes| vec_bytes.into_iter().next().unwrap())
931    }
932
933    /// Provides access to the underlying reader
934    ///
935    /// Do not use this for reading data as it will bypass any I/O scheduling!
936    /// This is mainly exposed to allow metadata operations (e.g size, block_size,)
937    /// which either aren't IOPS or we don't throttle
938    pub fn reader(&self) -> &Arc<dyn Reader> {
939        &self.reader
940    }
941}
942
943#[cfg(test)]
944mod tests {
945    use std::{collections::VecDeque, time::Duration};
946
947    use futures::poll;
948    use lance_core::utils::tempfile::TempObjFile;
949    use rand::RngCore;
950
951    use object_store::{memory::InMemory, GetRange, ObjectStore as OSObjectStore};
952    use tokio::{runtime::Handle, time::timeout};
953    use url::Url;
954
955    use crate::{
956        object_store::{DEFAULT_DOWNLOAD_RETRY_COUNT, DEFAULT_MAX_IOP_SIZE},
957        testing::MockObjectStore,
958    };
959
960    use super::*;
961
962    #[tokio::test]
963    async fn test_full_seq_read() {
964        let tmp_file = TempObjFile::default();
965
966        let obj_store = Arc::new(ObjectStore::local());
967
968        // Write 1MiB of data
969        const DATA_SIZE: u64 = 1024 * 1024;
970        let mut some_data = vec![0; DATA_SIZE as usize];
971        rand::rng().fill_bytes(&mut some_data);
972        obj_store.put(&tmp_file, &some_data).await.unwrap();
973
974        let config = SchedulerConfig::default_for_testing();
975
976        let scheduler = ScanScheduler::new(obj_store, config);
977
978        let file_scheduler = scheduler
979            .open_file(&tmp_file, &CachedFileSize::unknown())
980            .await
981            .unwrap();
982
983        // Read it back 4KiB at a time
984        const READ_SIZE: u64 = 4 * 1024;
985        let mut reqs = VecDeque::new();
986        let mut offset = 0;
987        while offset < DATA_SIZE {
988            reqs.push_back(
989                #[allow(clippy::single_range_in_vec_init)]
990                file_scheduler
991                    .submit_request(vec![offset..offset + READ_SIZE], 0)
992                    .await
993                    .unwrap(),
994            );
995            offset += READ_SIZE;
996        }
997
998        offset = 0;
999        // Note: we should get parallel I/O even though we are consuming serially
1000        while offset < DATA_SIZE {
1001            let data = reqs.pop_front().unwrap();
1002            let actual = &data[0];
1003            let expected = &some_data[offset as usize..(offset + READ_SIZE) as usize];
1004            assert_eq!(expected, actual);
1005            offset += READ_SIZE;
1006        }
1007    }
1008
1009    #[tokio::test]
1010    async fn test_split_coalesce() {
1011        let tmp_file = TempObjFile::default();
1012
1013        let obj_store = Arc::new(ObjectStore::local());
1014
1015        // Write 75MiB of data
1016        const DATA_SIZE: u64 = 75 * 1024 * 1024;
1017        let mut some_data = vec![0; DATA_SIZE as usize];
1018        rand::rng().fill_bytes(&mut some_data);
1019        obj_store.put(&tmp_file, &some_data).await.unwrap();
1020
1021        let config = SchedulerConfig::default_for_testing();
1022
1023        let scheduler = ScanScheduler::new(obj_store, config);
1024
1025        let file_scheduler = scheduler
1026            .open_file(&tmp_file, &CachedFileSize::unknown())
1027            .await
1028            .unwrap();
1029
1030        // These 3 requests should be coalesced into a single I/O because they are within 4KiB
1031        // of each other
1032        let req =
1033            file_scheduler.submit_request(vec![50_000..51_000, 52_000..53_000, 54_000..55_000], 0);
1034
1035        let bytes = req.await.unwrap();
1036
1037        assert_eq!(bytes[0], &some_data[50_000..51_000]);
1038        assert_eq!(bytes[1], &some_data[52_000..53_000]);
1039        assert_eq!(bytes[2], &some_data[54_000..55_000]);
1040
1041        assert_eq!(1, scheduler.stats().iops);
1042
1043        // This should be split into 5 requests because it is so large
1044        let req = file_scheduler.submit_request(vec![0..DATA_SIZE], 0);
1045        let bytes = req.await.unwrap();
1046        assert!(bytes[0] == some_data, "data is not the same");
1047
1048        assert_eq!(6, scheduler.stats().iops);
1049
1050        // None of these requests are bigger than the max IOP size but they will be coalesced into
1051        // one IOP that is bigger and then split back into 2 requests that don't quite align with the original
1052        // ranges.
1053        let chunk_size = *DEFAULT_MAX_IOP_SIZE;
1054        let req = file_scheduler.submit_request(
1055            vec![
1056                10..chunk_size,
1057                chunk_size + 10..(chunk_size * 2) - 20,
1058                chunk_size * 2..(chunk_size * 2) + 10,
1059            ],
1060            0,
1061        );
1062
1063        let bytes = req.await.unwrap();
1064        let chunk_size = chunk_size as usize;
1065        assert!(
1066            bytes[0] == some_data[10..chunk_size],
1067            "data is not the same"
1068        );
1069        assert!(
1070            bytes[1] == some_data[chunk_size + 10..(chunk_size * 2) - 20],
1071            "data is not the same"
1072        );
1073        assert!(
1074            bytes[2] == some_data[chunk_size * 2..(chunk_size * 2) + 10],
1075            "data is not the same"
1076        );
1077        assert_eq!(8, scheduler.stats().iops);
1078
1079        let reads = (0..44)
1080            .map(|i| i * 1_000_000..(i + 1) * 1_000_000)
1081            .collect::<Vec<_>>();
1082        let req = file_scheduler.submit_request(reads, 0);
1083        let bytes = req.await.unwrap();
1084        for (i, bytes) in bytes.iter().enumerate() {
1085            assert!(
1086                bytes == &some_data[i * 1_000_000..(i + 1) * 1_000_000],
1087                "data is not the same"
1088            );
1089        }
1090        assert_eq!(11, scheduler.stats().iops);
1091    }
1092
1093    #[tokio::test]
1094    async fn test_priority() {
1095        let some_path = Path::parse("foo").unwrap();
1096        let base_store = Arc::new(InMemory::new());
1097        base_store
1098            .put(&some_path, vec![0; 1000].into())
1099            .await
1100            .unwrap();
1101
1102        let semaphore = Arc::new(tokio::sync::Semaphore::new(0));
1103        let mut obj_store = MockObjectStore::default();
1104        let semaphore_copy = semaphore.clone();
1105        obj_store
1106            .expect_get_opts()
1107            .returning(move |location, options| {
1108                let semaphore = semaphore.clone();
1109                let base_store = base_store.clone();
1110                let location = location.clone();
1111                async move {
1112                    semaphore.acquire().await.unwrap().forget();
1113                    base_store.get_opts(&location, options).await
1114                }
1115                .boxed()
1116            });
1117        let obj_store = Arc::new(ObjectStore::new(
1118            Arc::new(obj_store),
1119            Url::parse("mem://").unwrap(),
1120            Some(500),
1121            None,
1122            false,
1123            false,
1124            1,
1125            DEFAULT_DOWNLOAD_RETRY_COUNT,
1126            None,
1127        ));
1128
1129        let config = SchedulerConfig {
1130            io_buffer_size_bytes: 1024 * 1024,
1131        };
1132
1133        let scan_scheduler = ScanScheduler::new(obj_store, config);
1134
1135        let file_scheduler = scan_scheduler
1136            .open_file(&Path::parse("foo").unwrap(), &CachedFileSize::new(1000))
1137            .await
1138            .unwrap();
1139
1140        // Issue a request, priority doesn't matter, it will be submitted
1141        // immediately (it will go pending)
1142        // Note: the timeout is to prevent a deadlock if the test fails.
1143        let first_fut = timeout(
1144            Duration::from_secs(10),
1145            file_scheduler.submit_single(0..10, 0),
1146        )
1147        .boxed();
1148
1149        // Issue another low priority request (it will go in queue)
1150        let mut second_fut = timeout(
1151            Duration::from_secs(10),
1152            file_scheduler.submit_single(0..20, 100),
1153        )
1154        .boxed();
1155
1156        // Issue a high priority request (it will go in queue and should bump
1157        // the other queued request down)
1158        let mut third_fut = timeout(
1159            Duration::from_secs(10),
1160            file_scheduler.submit_single(0..30, 0),
1161        )
1162        .boxed();
1163
1164        // Finish one file, should be the in-flight first request
1165        semaphore_copy.add_permits(1);
1166        assert!(first_fut.await.unwrap().unwrap().len() == 10);
1167        // Other requests should not be finished
1168        assert!(poll!(&mut second_fut).is_pending());
1169        assert!(poll!(&mut third_fut).is_pending());
1170
1171        // Next should be high priority request
1172        semaphore_copy.add_permits(1);
1173        assert!(third_fut.await.unwrap().unwrap().len() == 30);
1174        assert!(poll!(&mut second_fut).is_pending());
1175
1176        // Finally, the low priority request
1177        semaphore_copy.add_permits(1);
1178        assert!(second_fut.await.unwrap().unwrap().len() == 20);
1179    }
1180
1181    #[tokio::test(flavor = "multi_thread")]
1182    async fn test_backpressure() {
1183        let some_path = Path::parse("foo").unwrap();
1184        let base_store = Arc::new(InMemory::new());
1185        base_store
1186            .put(&some_path, vec![0; 100000].into())
1187            .await
1188            .unwrap();
1189
1190        let bytes_read = Arc::new(AtomicU64::from(0));
1191        let mut obj_store = MockObjectStore::default();
1192        let bytes_read_copy = bytes_read.clone();
1193        // Wraps the obj_store to keep track of how many bytes have been read
1194        obj_store
1195            .expect_get_opts()
1196            .returning(move |location, options| {
1197                let range = options.range.as_ref().unwrap();
1198                let num_bytes = match range {
1199                    GetRange::Bounded(bounded) => bounded.end - bounded.start,
1200                    _ => panic!(),
1201                };
1202                bytes_read_copy.fetch_add(num_bytes, Ordering::Release);
1203                let location = location.clone();
1204                let base_store = base_store.clone();
1205                async move { base_store.get_opts(&location, options).await }.boxed()
1206            });
1207        let obj_store = Arc::new(ObjectStore::new(
1208            Arc::new(obj_store),
1209            Url::parse("mem://").unwrap(),
1210            Some(500),
1211            None,
1212            false,
1213            false,
1214            1,
1215            DEFAULT_DOWNLOAD_RETRY_COUNT,
1216            None,
1217        ));
1218
1219        let config = SchedulerConfig {
1220            io_buffer_size_bytes: 10,
1221        };
1222
1223        let scan_scheduler = ScanScheduler::new(obj_store.clone(), config);
1224
1225        let file_scheduler = scan_scheduler
1226            .open_file(&Path::parse("foo").unwrap(), &CachedFileSize::new(100000))
1227            .await
1228            .unwrap();
1229
1230        let wait_for_idle = || async move {
1231            let handle = Handle::current();
1232            while handle.metrics().num_alive_tasks() != 1 {
1233                tokio::time::sleep(Duration::from_millis(10)).await;
1234            }
1235        };
1236        let wait_for_bytes_read_and_idle = |target_bytes: u64| {
1237            // We need to move `target` but don't want to move `bytes_read`
1238            let bytes_read = &bytes_read;
1239            async move {
1240                let bytes_read_copy = bytes_read.clone();
1241                while bytes_read_copy.load(Ordering::Acquire) < target_bytes {
1242                    tokio::time::sleep(Duration::from_millis(10)).await;
1243                }
1244                wait_for_idle().await;
1245            }
1246        };
1247
1248        // This read will begin immediately
1249        let first_fut = file_scheduler.submit_single(0..5, 0);
1250        // This read should also begin immediately
1251        let second_fut = file_scheduler.submit_single(0..5, 0);
1252        // This read will be throttled
1253        let third_fut = file_scheduler.submit_single(0..3, 0);
1254        // Two tasks (third_fut and unit test)
1255        wait_for_bytes_read_and_idle(10).await;
1256
1257        assert_eq!(first_fut.await.unwrap().len(), 5);
1258        // One task (unit test)
1259        wait_for_bytes_read_and_idle(13).await;
1260
1261        // 2 bytes are ready but 5 bytes requested, read will be blocked
1262        let fourth_fut = file_scheduler.submit_single(0..5, 0);
1263        wait_for_bytes_read_and_idle(13).await;
1264
1265        // Out of order completion is ok, will unblock backpressure
1266        assert_eq!(third_fut.await.unwrap().len(), 3);
1267        wait_for_bytes_read_and_idle(18).await;
1268
1269        assert_eq!(second_fut.await.unwrap().len(), 5);
1270        // At this point there are 5 bytes available in backpressure queue
1271        // Now we issue multi-read that can be partially fulfilled, it will read some bytes but
1272        // not all of them. (using large range gap to ensure request not coalesced)
1273        //
1274        // I'm actually not sure this behavior is great.  It's possible that we should just
1275        // block until we can fulfill the entire request.
1276        let fifth_fut = file_scheduler.submit_request(vec![0..3, 90000..90007], 0);
1277        wait_for_bytes_read_and_idle(21).await;
1278
1279        // Fifth future should eventually finish due to deadlock prevention
1280        let fifth_bytes = tokio::time::timeout(Duration::from_secs(10), fifth_fut)
1281            .await
1282            .unwrap();
1283        assert_eq!(
1284            fifth_bytes.unwrap().iter().map(|b| b.len()).sum::<usize>(),
1285            10
1286        );
1287
1288        // And now let's just make sure that we can read the rest of the data
1289        assert_eq!(fourth_fut.await.unwrap().len(), 5);
1290        wait_for_bytes_read_and_idle(28).await;
1291
1292        // Ensure deadlock prevention timeout can be disabled
1293        let config = SchedulerConfig {
1294            io_buffer_size_bytes: 10,
1295        };
1296
1297        let scan_scheduler = ScanScheduler::new(obj_store, config);
1298        let file_scheduler = scan_scheduler
1299            .open_file(&Path::parse("foo").unwrap(), &CachedFileSize::new(100000))
1300            .await
1301            .unwrap();
1302
1303        let first_fut = file_scheduler.submit_single(0..10, 0);
1304        let second_fut = file_scheduler.submit_single(0..10, 0);
1305
1306        std::thread::sleep(Duration::from_millis(100));
1307        assert_eq!(first_fut.await.unwrap().len(), 10);
1308        assert_eq!(second_fut.await.unwrap().len(), 10);
1309    }
1310
1311    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1312    async fn stress_backpressure() {
1313        // This test ensures that the backpressure mechanism works correctly with
1314        // regards to priority.  In other words, as long as all requests are consumed
1315        // in priority order then the backpressure mechanism should not deadlock
1316        let some_path = Path::parse("foo").unwrap();
1317        let obj_store = Arc::new(ObjectStore::memory());
1318        obj_store
1319            .put(&some_path, vec![0; 100000].as_slice())
1320            .await
1321            .unwrap();
1322
1323        // Only one request will be allowed in
1324        let config = SchedulerConfig {
1325            io_buffer_size_bytes: 1,
1326        };
1327        let scan_scheduler = ScanScheduler::new(obj_store.clone(), config);
1328        let file_scheduler = scan_scheduler
1329            .open_file(&some_path, &CachedFileSize::unknown())
1330            .await
1331            .unwrap();
1332
1333        let mut futs = Vec::with_capacity(10000);
1334        for idx in 0..10000 {
1335            futs.push(file_scheduler.submit_single(idx..idx + 1, idx));
1336        }
1337
1338        for fut in futs {
1339            fut.await.unwrap();
1340        }
1341    }
1342}