lance_io/
scheduler.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use bytes::Bytes;
5use futures::channel::oneshot;
6use futures::{FutureExt, TryFutureExt};
7use object_store::path::Path;
8use snafu::location;
9use std::collections::BinaryHeap;
10use std::fmt::Debug;
11use std::future::Future;
12use std::num::NonZero;
13use std::ops::Range;
14use std::sync::atomic::{AtomicU64, Ordering};
15use std::sync::{Arc, Mutex};
16use std::time::Instant;
17use tokio::sync::{Notify, Semaphore, SemaphorePermit};
18
19use lance_core::{Error, Result};
20
21use crate::object_store::ObjectStore;
22use crate::traits::Reader;
23use crate::utils::CachedFileSize;
24
25// Don't log backpressure warnings until at least this many seconds have passed
26const BACKPRESSURE_MIN: u64 = 5;
27// Don't log backpressure warnings more than once / minute
28const BACKPRESSURE_DEBOUNCE: u64 = 60;
29
30// Global counter of how many IOPS we have issued
31static IOPS_COUNTER: AtomicU64 = AtomicU64::new(0);
32// Global counter of how many bytes were read by the scheduler
33static BYTES_READ_COUNTER: AtomicU64 = AtomicU64::new(0);
34// By default, we limit the number of IOPS across the entire process to 128
35//
36// In theory this is enough for ~10GBps on S3 following the guidelines to issue
37// 1 IOP per 80MBps.  In practice, I have noticed slightly better performance going
38// up to 256.
39//
40// However, non-S3 stores (e.g. GCS, Azure) can suffer significantly from too many
41// concurrent IOPS.  For safety, we set the default to 128 and let the user override
42// this if needed.
43//
44// Note: this only limits things that run through the scheduler.  It does not limit
45// IOPS from other sources like writing or commits.
46static DEFAULT_PROCESS_IOPS_LIMIT: i32 = 128;
47
48pub fn iops_counter() -> u64 {
49    IOPS_COUNTER.load(Ordering::Acquire)
50}
51
52pub fn bytes_read_counter() -> u64 {
53    BYTES_READ_COUNTER.load(Ordering::Acquire)
54}
55
56// There are two structures that control the I/O scheduler concurrency.  First,
57// we have a hard limit on the number of IOPS that can be issued concurrently.
58// This limit is process-wide.
59//
60// Second, we try and limit how many I/O requests can be buffered in memory without
61// being consumed by a decoder of some kind.  This limit is per-scheduler.  We cannot
62// make this limit process wide without introducing deadlock (because the decoder for
63// file 0 might be waiting on IOPS blocked by a queue filled with requests for file 1)
64// and vice-versa.
65//
66// There is also a per-scan limit on the number of IOPS that can be issued concurrently.
67//
68// The process-wide limit exists when users need a hard limit on the number of parallel
69// IOPS, e.g. due to port availability limits or to prevent multiple scans from saturating
70// the network.  (Note: a process-wide limit of X will not necessarily limit the number of
71// open TCP connections to exactly X.  The underlying object store may open more connections
72// anyways)
73//
74// However, it can be too tough in some cases, e.g. when some scans are reading from
75// cloud storage and other scans are reading from local disk.  In these cases users don't
76// need to set a process-limit and can rely on the per-scan limits.
77
78// The IopsQuota enforces the first of the above limits, it is the per-process hard cap
79// on the number of IOPS that can be issued concurrently.
80//
81// The per-scan limits are enforced by IoQueue
82struct IopsQuota {
83    // An Option is used here to avoid mutex overhead if no limit is set
84    iops_avail: Option<Semaphore>,
85}
86
87/// A reservation on the global IOPS quota
88///
89/// When the reservation is dropped, the IOPS quota is released unless
90/// [`Self::forget`] is called.
91struct IopsReservation<'a> {
92    value: Option<SemaphorePermit<'a>>,
93}
94
95impl IopsReservation<'_> {
96    // Forget the reservation, so it won't be released on drop
97    fn forget(&mut self) {
98        if let Some(value) = self.value.take() {
99            value.forget();
100        }
101    }
102}
103
104impl IopsQuota {
105    // By default, we throttle the number of scan IOPS across the entire process
106    //
107    // However, the user can disable this by setting the environment variable
108    // LANCE_PROCESS_IO_THREADS_LIMIT to zero (or a negative integer).
109    fn new() -> Self {
110        let initial_capacity = std::env::var("LANCE_PROCESS_IO_THREADS_LIMIT")
111            .map(|s| {
112                s.parse::<i32>().unwrap_or_else(|_| {
113                    log::warn!("Ignoring invalid LANCE_PROCESS_IO_THREADS_LIMIT: {}", s);
114                    DEFAULT_PROCESS_IOPS_LIMIT
115                })
116            })
117            .unwrap_or(DEFAULT_PROCESS_IOPS_LIMIT);
118        let iops_avail = if initial_capacity <= 0 {
119            None
120        } else {
121            Some(Semaphore::new(initial_capacity as usize))
122        };
123        Self { iops_avail }
124    }
125
126    // Return a reservation on the global IOPS quota
127    fn release(&self) {
128        if let Some(iops_avail) = self.iops_avail.as_ref() {
129            iops_avail.add_permits(1);
130        }
131    }
132
133    // Acquire a reservation on the global IOPS quota
134    async fn acquire(&self) -> IopsReservation<'_> {
135        if let Some(iops_avail) = self.iops_avail.as_ref() {
136            IopsReservation {
137                value: Some(iops_avail.acquire().await.unwrap()),
138            }
139        } else {
140            IopsReservation { value: None }
141        }
142    }
143}
144
145static IOPS_QUOTA: std::sync::LazyLock<IopsQuota> = std::sync::LazyLock::new(IopsQuota::new);
146
147// We want to allow requests that have a lower priority than any
148// currently in-flight request.  This helps avoid potential deadlocks
149// related to backpressure.  Unfortunately, it is quite expensive to
150// keep track of which priorities are in-flight.
151//
152// TODO: At some point it would be nice if we can optimize this away but
153// in_flight should remain relatively small (generally less than 256 items)
154// and has not shown itself to be a bottleneck yet.
155struct PrioritiesInFlight {
156    in_flight: Vec<u128>,
157}
158
159impl PrioritiesInFlight {
160    fn new(capacity: u32) -> Self {
161        Self {
162            in_flight: Vec::with_capacity(capacity as usize * 2),
163        }
164    }
165
166    fn min_in_flight(&self) -> u128 {
167        self.in_flight.first().copied().unwrap_or(u128::MAX)
168    }
169
170    fn push(&mut self, prio: u128) {
171        let pos = match self.in_flight.binary_search(&prio) {
172            Ok(pos) => pos,
173            Err(pos) => pos,
174        };
175        self.in_flight.insert(pos, prio);
176    }
177
178    fn remove(&mut self, prio: u128) {
179        if let Ok(pos) = self.in_flight.binary_search(&prio) {
180            self.in_flight.remove(pos);
181        }
182    }
183}
184
185struct IoQueueState {
186    // Number of IOPS we can issue concurrently before pausing I/O
187    iops_avail: u32,
188    // Number of bytes we are allowed to buffer in memory before pausing I/O
189    //
190    // This can dip below 0 due to I/O prioritization
191    bytes_avail: i64,
192    // Pending I/O requests
193    pending_requests: BinaryHeap<IoTask>,
194    // Priorities of in-flight requests
195    priorities_in_flight: PrioritiesInFlight,
196    // Set when the scheduler is finished to notify the I/O loop to shut down
197    // once all outstanding requests have been completed.
198    done_scheduling: bool,
199    // Time when the scheduler started
200    start: Instant,
201    // Last time we warned about backpressure
202    last_warn: AtomicU64,
203}
204
205impl IoQueueState {
206    fn new(io_capacity: u32, io_buffer_size: u64) -> Self {
207        Self {
208            iops_avail: io_capacity,
209            bytes_avail: io_buffer_size as i64,
210            pending_requests: BinaryHeap::new(),
211            priorities_in_flight: PrioritiesInFlight::new(io_capacity),
212            done_scheduling: false,
213            start: Instant::now(),
214            last_warn: AtomicU64::from(0),
215        }
216    }
217
218    fn warn_if_needed(&self) {
219        let seconds_elapsed = self.start.elapsed().as_secs();
220        let last_warn = self.last_warn.load(Ordering::Acquire);
221        let since_last_warn = seconds_elapsed - last_warn;
222        if (last_warn == 0
223            && seconds_elapsed > BACKPRESSURE_MIN
224            && seconds_elapsed < BACKPRESSURE_DEBOUNCE)
225            || since_last_warn > BACKPRESSURE_DEBOUNCE
226        {
227            tracing::event!(tracing::Level::DEBUG, "Backpressure throttle exceeded");
228            log::debug!("Backpressure throttle is full, I/O will pause until buffer is drained.  Max I/O bandwidth will not be achieved because CPU is falling behind");
229            self.last_warn
230                .store(seconds_elapsed.max(1), Ordering::Release);
231        }
232    }
233
234    fn can_deliver(&self, task: &IoTask) -> bool {
235        if self.iops_avail == 0 {
236            false
237        } else if task.priority <= self.priorities_in_flight.min_in_flight() {
238            true
239        } else if task.num_bytes() as i64 > self.bytes_avail {
240            self.warn_if_needed();
241            false
242        } else {
243            true
244        }
245    }
246
247    fn next_task(&mut self) -> Option<IoTask> {
248        let task = self.pending_requests.peek()?;
249        if self.can_deliver(task) {
250            self.priorities_in_flight.push(task.priority);
251            self.iops_avail -= 1;
252            self.bytes_avail -= task.num_bytes() as i64;
253            if self.bytes_avail < 0 {
254                // This can happen when we admit special priority requests
255                log::debug!(
256                    "Backpressure throttle temporarily exceeded by {} bytes due to priority I/O",
257                    -self.bytes_avail
258                );
259            }
260            Some(self.pending_requests.pop().unwrap())
261        } else {
262            None
263        }
264    }
265}
266
267// This is modeled after the MPSC queue described here: https://docs.rs/tokio/latest/tokio/sync/struct.Notify.html
268//
269// However, it only needs to be SPSC since there is only one "scheduler thread"
270// and one I/O loop.
271struct IoQueue {
272    // Queue state
273    state: Mutex<IoQueueState>,
274    // Used to signal new I/O requests have arrived that might potentially be runnable
275    notify: Notify,
276}
277
278impl IoQueue {
279    fn new(io_capacity: u32, io_buffer_size: u64) -> Self {
280        Self {
281            state: Mutex::new(IoQueueState::new(io_capacity, io_buffer_size)),
282            notify: Notify::new(),
283        }
284    }
285
286    fn push(&self, task: IoTask) {
287        log::trace!(
288            "Inserting I/O request for {} bytes with priority ({},{}) into I/O queue",
289            task.num_bytes(),
290            task.priority >> 64,
291            task.priority & 0xFFFFFFFFFFFFFFFF
292        );
293        let mut state = self.state.lock().unwrap();
294        state.pending_requests.push(task);
295        drop(state);
296
297        self.notify.notify_one();
298    }
299
300    async fn pop(&self) -> Option<IoTask> {
301        loop {
302            {
303                // First, grab a reservation on the global IOPS quota
304                // If we then get a task to run, transfer the reservation
305                // to the task.  Otherwise, the reservation will be released
306                // when iop_res is dropped.
307                let mut iop_res = IOPS_QUOTA.acquire().await;
308                // Next, try and grab a reservation from the queue
309                let mut state = self.state.lock().unwrap();
310                if let Some(task) = state.next_task() {
311                    // Reservation successfully acquired, we will release the global
312                    // global reservation after task has run.
313                    iop_res.forget();
314                    return Some(task);
315                }
316
317                if state.done_scheduling {
318                    return None;
319                }
320            }
321
322            self.notify.notified().await;
323        }
324    }
325
326    fn on_iop_complete(&self) {
327        let mut state = self.state.lock().unwrap();
328        state.iops_avail += 1;
329        drop(state);
330
331        self.notify.notify_one();
332    }
333
334    fn on_bytes_consumed(&self, bytes: u64, priority: u128, num_reqs: usize) {
335        let mut state = self.state.lock().unwrap();
336        state.bytes_avail += bytes as i64;
337        for _ in 0..num_reqs {
338            state.priorities_in_flight.remove(priority);
339        }
340        drop(state);
341
342        self.notify.notify_one();
343    }
344
345    fn close(&self) {
346        let mut state = self.state.lock().unwrap();
347        state.done_scheduling = true;
348        let pending_requests = std::mem::take(&mut state.pending_requests);
349        drop(state);
350        for request in pending_requests {
351            request.cancel();
352        }
353
354        self.notify.notify_one();
355    }
356}
357
358// There is one instance of MutableBatch shared by all the I/O operations
359// that make up a single request.  When all the I/O operations complete
360// then the MutableBatch goes out of scope and the batch request is considered
361// complete
362struct MutableBatch<F: FnOnce(Response) + Send> {
363    when_done: Option<F>,
364    data_buffers: Vec<Bytes>,
365    num_bytes: u64,
366    priority: u128,
367    num_reqs: usize,
368    err: Option<Box<dyn std::error::Error + Send + Sync + 'static>>,
369}
370
371impl<F: FnOnce(Response) + Send> MutableBatch<F> {
372    fn new(when_done: F, num_data_buffers: u32, priority: u128, num_reqs: usize) -> Self {
373        Self {
374            when_done: Some(when_done),
375            data_buffers: vec![Bytes::default(); num_data_buffers as usize],
376            num_bytes: 0,
377            priority,
378            num_reqs,
379            err: None,
380        }
381    }
382}
383
384// Rather than keep track of when all the I/O requests are finished so that we
385// can deliver the batch of data we let Rust do that for us.  When all I/O's are
386// done then the MutableBatch will go out of scope and we know we have all the
387// data.
388impl<F: FnOnce(Response) + Send> Drop for MutableBatch<F> {
389    fn drop(&mut self) {
390        // If we have an error, return that.  Otherwise return the data
391        let result = if self.err.is_some() {
392            Err(Error::Wrapped {
393                error: self.err.take().unwrap(),
394                location: location!(),
395            })
396        } else {
397            let mut data = Vec::new();
398            std::mem::swap(&mut data, &mut self.data_buffers);
399            Ok(data)
400        };
401        // We don't really care if no one is around to receive it, just let
402        // the result go out of scope and get cleaned up
403        let response = Response {
404            data: result,
405            num_bytes: self.num_bytes,
406            priority: self.priority,
407            num_reqs: self.num_reqs,
408        };
409        (self.when_done.take().unwrap())(response);
410    }
411}
412
413struct DataChunk {
414    task_idx: usize,
415    num_bytes: u64,
416    data: Result<Bytes>,
417}
418
419trait DataSink: Send {
420    fn deliver_data(&mut self, data: DataChunk);
421}
422
423impl<F: FnOnce(Response) + Send> DataSink for MutableBatch<F> {
424    // Called by worker tasks to add data to the MutableBatch
425    fn deliver_data(&mut self, data: DataChunk) {
426        self.num_bytes += data.num_bytes;
427        match data.data {
428            Ok(data_bytes) => {
429                self.data_buffers[data.task_idx] = data_bytes;
430            }
431            Err(err) => {
432                // This keeps the original error, if present
433                self.err.get_or_insert(Box::new(err));
434            }
435        }
436    }
437}
438
439struct IoTask {
440    reader: Arc<dyn Reader>,
441    to_read: Range<u64>,
442    when_done: Box<dyn FnOnce(Result<Bytes>) + Send>,
443    priority: u128,
444}
445
446impl Eq for IoTask {}
447
448impl PartialEq for IoTask {
449    fn eq(&self, other: &Self) -> bool {
450        self.priority == other.priority
451    }
452}
453
454impl PartialOrd for IoTask {
455    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
456        Some(self.cmp(other))
457    }
458}
459
460impl Ord for IoTask {
461    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
462        // This is intentionally inverted.  We want a min-heap
463        other.priority.cmp(&self.priority)
464    }
465}
466
467impl IoTask {
468    fn num_bytes(&self) -> u64 {
469        self.to_read.end - self.to_read.start
470    }
471    fn cancel(self) {
472        (self.when_done)(Err(Error::Internal {
473            message: "Scheduler closed before I/O was completed".to_string(),
474            location: location!(),
475        }));
476    }
477
478    async fn run(self) {
479        let file_path = self.reader.path().as_ref();
480        let num_bytes = self.num_bytes();
481        let bytes = if self.to_read.start == self.to_read.end {
482            Ok(Bytes::new())
483        } else {
484            let bytes_fut = self
485                .reader
486                .get_range(self.to_read.start as usize..self.to_read.end as usize);
487            IOPS_COUNTER.fetch_add(1, Ordering::Release);
488            let num_bytes = self.num_bytes();
489            bytes_fut
490                .inspect(move |_| {
491                    BYTES_READ_COUNTER.fetch_add(num_bytes, Ordering::Release);
492                })
493                .await
494                .map_err(Error::from)
495        };
496        // Emit per-file I/O trace event only when tracing is enabled
497        tracing::trace!(
498            file = file_path,
499            bytes_read = num_bytes,
500            requests = 1,
501            range_start = self.to_read.start,
502            range_end = self.to_read.end,
503            "File I/O completed"
504        );
505        IOPS_QUOTA.release();
506        (self.when_done)(bytes);
507    }
508}
509
510// Every time a scheduler starts up it launches a task to run the I/O loop.  This loop
511// repeats endlessly until the scheduler is destroyed.
512async fn run_io_loop(tasks: Arc<IoQueue>) {
513    // Pop the first finished task off the queue and submit another until
514    // we are done
515    loop {
516        let next_task = tasks.pop().await;
517        match next_task {
518            Some(task) => {
519                tokio::spawn(task.run());
520            }
521            None => {
522                // The sender has been dropped, we are done
523                return;
524            }
525        }
526    }
527}
528
529#[derive(Debug)]
530struct StatsCollector {
531    iops: AtomicU64,
532    requests: AtomicU64,
533    bytes_read: AtomicU64,
534}
535
536impl StatsCollector {
537    fn new() -> Self {
538        Self {
539            iops: AtomicU64::new(0),
540            requests: AtomicU64::new(0),
541            bytes_read: AtomicU64::new(0),
542        }
543    }
544
545    fn iops(&self) -> u64 {
546        self.iops.load(Ordering::Relaxed)
547    }
548
549    fn bytes_read(&self) -> u64 {
550        self.bytes_read.load(Ordering::Relaxed)
551    }
552
553    fn requests(&self) -> u64 {
554        self.requests.load(Ordering::Relaxed)
555    }
556
557    fn record_request(&self, request: &[Range<u64>]) {
558        self.requests.fetch_add(1, Ordering::Relaxed);
559        self.iops.fetch_add(request.len() as u64, Ordering::Relaxed);
560        self.bytes_read.fetch_add(
561            request.iter().map(|r| r.end - r.start).sum::<u64>(),
562            Ordering::Relaxed,
563        );
564    }
565}
566
567pub struct ScanStats {
568    pub iops: u64,
569    pub requests: u64,
570    pub bytes_read: u64,
571}
572
573impl ScanStats {
574    fn new(stats: &StatsCollector) -> Self {
575        Self {
576            iops: stats.iops(),
577            requests: stats.requests(),
578            bytes_read: stats.bytes_read(),
579        }
580    }
581}
582
583/// An I/O scheduler which wraps an ObjectStore and throttles the amount of
584/// parallel I/O that can be run.
585///
586/// The ScanScheduler will cancel any outstanding I/O requests when it is dropped.
587/// For this reason it should be kept alive until all I/O has finished.
588///
589/// Note: The 2.X file readers already do this so this is only a concern if you are
590/// using the ScanScheduler directly.
591pub struct ScanScheduler {
592    object_store: Arc<ObjectStore>,
593    io_queue: Arc<IoQueue>,
594    stats: Arc<StatsCollector>,
595}
596
597impl Debug for ScanScheduler {
598    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
599        f.debug_struct("ScanScheduler")
600            .field("object_store", &self.object_store)
601            .finish()
602    }
603}
604
605struct Response {
606    data: Result<Vec<Bytes>>,
607    priority: u128,
608    num_reqs: usize,
609    num_bytes: u64,
610}
611
612#[derive(Debug, Clone, Copy)]
613pub struct SchedulerConfig {
614    /// the # of bytes that can be buffered but not yet requested.
615    /// This controls back pressure.  If data is not processed quickly enough then this
616    /// buffer will fill up and the I/O loop will pause until the buffer is drained.
617    pub io_buffer_size_bytes: u64,
618}
619
620impl SchedulerConfig {
621    /// Big enough for unit testing
622    pub fn default_for_testing() -> Self {
623        Self {
624            io_buffer_size_bytes: 256 * 1024 * 1024,
625        }
626    }
627
628    /// Configuration that should generally maximize bandwidth (not trying to save RAM
629    /// at all).  We assume a max page size of 32MiB and then allow 32MiB per I/O thread
630    pub fn max_bandwidth(store: &ObjectStore) -> Self {
631        Self {
632            io_buffer_size_bytes: 32 * 1024 * 1024 * store.io_parallelism() as u64,
633        }
634    }
635}
636
637impl ScanScheduler {
638    /// Create a new scheduler with the given I/O capacity
639    ///
640    /// # Arguments
641    ///
642    /// * object_store - the store to wrap
643    /// * config - configuration settings for the scheduler
644    pub fn new(object_store: Arc<ObjectStore>, config: SchedulerConfig) -> Arc<Self> {
645        let io_capacity = object_store.io_parallelism();
646        let io_queue = Arc::new(IoQueue::new(
647            io_capacity as u32,
648            config.io_buffer_size_bytes,
649        ));
650        let slf = Arc::new(Self {
651            object_store,
652            io_queue: io_queue.clone(),
653            stats: Arc::new(StatsCollector::new()),
654        });
655        // Best we can do here is fire and forget.  If the I/O loop is still running when the scheduler is
656        // dropped we can't wait for it to finish or we'd block a tokio thread.  We could spawn a blocking task
657        // to wait for it to finish but that doesn't seem helpful.
658        tokio::task::spawn(async move { run_io_loop(io_queue).await });
659        slf
660    }
661
662    /// Open a file for reading
663    ///
664    /// # Arguments
665    ///
666    /// * path - the path to the file to open
667    /// * base_priority - the base priority for I/O requests submitted to this file scheduler
668    ///   this will determine the upper 64 bits of priority (the lower 64 bits
669    ///   come from `submit_request` and `submit_single`)
670    pub async fn open_file_with_priority(
671        self: &Arc<Self>,
672        path: &Path,
673        base_priority: u64,
674        file_size_bytes: &CachedFileSize,
675    ) -> Result<FileScheduler> {
676        let file_size_bytes = if let Some(size) = file_size_bytes.get() {
677            u64::from(size)
678        } else {
679            let size = self.object_store.size(path).await?;
680            if let Some(size) = NonZero::new(size) {
681                file_size_bytes.set(size);
682            }
683            size
684        };
685        let reader = self
686            .object_store
687            .open_with_size(path, file_size_bytes as usize)
688            .await?;
689        let block_size = self.object_store.block_size() as u64;
690        let max_iop_size = self.object_store.max_iop_size();
691        Ok(FileScheduler {
692            reader: reader.into(),
693            block_size,
694            root: self.clone(),
695            base_priority,
696            max_iop_size,
697        })
698    }
699
700    /// Open a file with a default priority of 0
701    ///
702    /// See [`Self::open_file_with_priority`] for more information on the priority
703    pub async fn open_file(
704        self: &Arc<Self>,
705        path: &Path,
706        file_size_bytes: &CachedFileSize,
707    ) -> Result<FileScheduler> {
708        self.open_file_with_priority(path, 0, file_size_bytes).await
709    }
710
711    fn do_submit_request(
712        &self,
713        reader: Arc<dyn Reader>,
714        request: Vec<Range<u64>>,
715        tx: oneshot::Sender<Response>,
716        priority: u128,
717    ) {
718        let num_iops = request.len() as u32;
719
720        let when_all_io_done = move |bytes_and_permits| {
721            // We don't care if the receiver has given up so discard the result
722            let _ = tx.send(bytes_and_permits);
723        };
724
725        let dest = Arc::new(Mutex::new(Box::new(MutableBatch::new(
726            when_all_io_done,
727            num_iops,
728            priority,
729            request.len(),
730        ))));
731
732        for (task_idx, iop) in request.into_iter().enumerate() {
733            let dest = dest.clone();
734            let io_queue = self.io_queue.clone();
735            let num_bytes = iop.end - iop.start;
736            let task = IoTask {
737                reader: reader.clone(),
738                to_read: iop,
739                priority,
740                when_done: Box::new(move |data| {
741                    io_queue.on_iop_complete();
742                    let mut dest = dest.lock().unwrap();
743                    let chunk = DataChunk {
744                        data,
745                        task_idx,
746                        num_bytes,
747                    };
748                    dest.deliver_data(chunk);
749                }),
750            };
751            self.io_queue.push(task);
752        }
753    }
754
755    fn submit_request(
756        &self,
757        reader: Arc<dyn Reader>,
758        request: Vec<Range<u64>>,
759        priority: u128,
760    ) -> impl Future<Output = Result<Vec<Bytes>>> + Send {
761        let (tx, rx) = oneshot::channel::<Response>();
762
763        self.do_submit_request(reader, request, tx, priority);
764
765        let io_queue = self.io_queue.clone();
766
767        rx.map(move |wrapped_rsp| {
768            // Right now, it isn't possible for I/O to be cancelled so a cancel error should
769            // not occur
770            let rsp = wrapped_rsp.unwrap();
771            io_queue.on_bytes_consumed(rsp.num_bytes, rsp.priority, rsp.num_reqs);
772            rsp.data
773        })
774    }
775
776    pub fn stats(&self) -> ScanStats {
777        ScanStats::new(self.stats.as_ref())
778    }
779}
780
781impl Drop for ScanScheduler {
782    fn drop(&mut self) {
783        // If the user is dropping the ScanScheduler then they _should_ be done with I/O.  This can happen
784        // even when I/O is in progress if, for example, the user is dropping a scan mid-read because they found
785        // the data they wanted (limit after filter or some other example).
786        //
787        // Closing the I/O queue will cancel any requests that have not yet been sent to the I/O loop.  However,
788        // it will not terminate the I/O loop itself.  This is to help prevent deadlock and ensure that all I/O
789        // requests that are submitted will terminate.
790        //
791        // In theory, this isn't strictly necessary, as callers should drop any task expecting I/O before they
792        // drop the scheduler.  In practice, this can be difficult to do, and it is better to spend a little bit
793        // of time letting the I/O loop drain so that we can avoid any potential deadlocks.
794        self.io_queue.close();
795    }
796}
797
798/// A throttled file reader
799#[derive(Clone, Debug)]
800pub struct FileScheduler {
801    reader: Arc<dyn Reader>,
802    root: Arc<ScanScheduler>,
803    block_size: u64,
804    base_priority: u64,
805    max_iop_size: u64,
806}
807
808fn is_close_together(range1: &Range<u64>, range2: &Range<u64>, block_size: u64) -> bool {
809    // Note that range1.end <= range2.start is possible (e.g. when decoding string arrays)
810    range2.start <= (range1.end + block_size)
811}
812
813fn is_overlapping(range1: &Range<u64>, range2: &Range<u64>) -> bool {
814    range1.start < range2.end && range2.start < range1.end
815}
816
817impl FileScheduler {
818    /// Submit a batch of I/O requests to the reader
819    ///
820    /// The requests will be queued in a FIFO manner and, when all requests
821    /// have been fulfilled, the returned future will be completed.
822    ///
823    /// Each request has a given priority.  If the I/O loop is full then requests
824    /// will be buffered and requests with the *lowest* priority will be released
825    /// from the buffer first.
826    ///
827    /// Each request has a backpressure ID which controls which backpressure throttle
828    /// is applied to the request.  Requests made to the same backpressure throttle
829    /// will be throttled together.
830    pub fn submit_request(
831        &self,
832        request: Vec<Range<u64>>,
833        priority: u64,
834    ) -> impl Future<Output = Result<Vec<Bytes>>> + Send {
835        // The final priority is a combination of the row offset and the file number
836        let priority = ((self.base_priority as u128) << 64) + priority as u128;
837
838        let mut merged_requests = Vec::with_capacity(request.len());
839
840        if !request.is_empty() {
841            let mut curr_interval = request[0].clone();
842
843            for req in request.iter().skip(1) {
844                if is_close_together(&curr_interval, req, self.block_size) {
845                    curr_interval.end = curr_interval.end.max(req.end);
846                } else {
847                    merged_requests.push(curr_interval);
848                    curr_interval = req.clone();
849                }
850            }
851
852            merged_requests.push(curr_interval);
853        }
854
855        let mut updated_requests = Vec::with_capacity(merged_requests.len());
856        for req in merged_requests {
857            if req.is_empty() {
858                updated_requests.push(req);
859            } else {
860                let num_requests = (req.end - req.start).div_ceil(self.max_iop_size);
861                let bytes_per_request = (req.end - req.start) / num_requests;
862                for i in 0..num_requests {
863                    let start = req.start + i * bytes_per_request;
864                    let end = if i == num_requests - 1 {
865                        // Last request is a bit bigger due to rounding
866                        req.end
867                    } else {
868                        start + bytes_per_request
869                    };
870                    updated_requests.push(start..end);
871                }
872            }
873        }
874
875        self.root.stats.record_request(&updated_requests);
876
877        let bytes_vec_fut =
878            self.root
879                .submit_request(self.reader.clone(), updated_requests.clone(), priority);
880
881        let mut updated_index = 0;
882        let mut final_bytes = Vec::with_capacity(request.len());
883
884        async move {
885            let bytes_vec = bytes_vec_fut.await?;
886
887            let mut orig_index = 0;
888            while (updated_index < updated_requests.len()) && (orig_index < request.len()) {
889                let updated_range = &updated_requests[updated_index];
890                let orig_range = &request[orig_index];
891                let byte_offset = updated_range.start as usize;
892
893                if is_overlapping(updated_range, orig_range) {
894                    // We need to undo the coalescing and splitting done earlier
895                    let start = orig_range.start as usize - byte_offset;
896                    if orig_range.end <= updated_range.end {
897                        // The original range is fully contained in the updated range, can do
898                        // zero-copy slice
899                        let end = orig_range.end as usize - byte_offset;
900                        final_bytes.push(bytes_vec[updated_index].slice(start..end));
901                    } else {
902                        // The original read was split into multiple requests, need to copy
903                        // back into a single buffer
904                        let orig_size = orig_range.end - orig_range.start;
905                        let mut merged_bytes = Vec::with_capacity(orig_size as usize);
906                        merged_bytes.extend_from_slice(&bytes_vec[updated_index].slice(start..));
907                        let mut copy_offset = merged_bytes.len() as u64;
908                        while copy_offset < orig_size {
909                            updated_index += 1;
910                            let next_range = &updated_requests[updated_index];
911                            let bytes_to_take =
912                                (orig_size - copy_offset).min(next_range.end - next_range.start);
913                            merged_bytes.extend_from_slice(
914                                &bytes_vec[updated_index].slice(0..bytes_to_take as usize),
915                            );
916                            copy_offset += bytes_to_take;
917                        }
918                        final_bytes.push(Bytes::from(merged_bytes));
919                    }
920                    orig_index += 1;
921                } else {
922                    updated_index += 1;
923                }
924            }
925
926            Ok(final_bytes)
927        }
928    }
929
930    pub fn with_priority(&self, priority: u64) -> Self {
931        Self {
932            reader: self.reader.clone(),
933            root: self.root.clone(),
934            block_size: self.block_size,
935            max_iop_size: self.max_iop_size,
936            base_priority: priority,
937        }
938    }
939
940    /// Submit a single IOP to the reader
941    ///
942    /// If you have multiple IOPS to perform then [`Self::submit_request`] is going
943    /// to be more efficient.
944    ///
945    /// See [`Self::submit_request`] for more information on the priority and backpressure.
946    pub fn submit_single(
947        &self,
948        range: Range<u64>,
949        priority: u64,
950    ) -> impl Future<Output = Result<Bytes>> + Send {
951        self.submit_request(vec![range], priority)
952            .map_ok(|vec_bytes| vec_bytes.into_iter().next().unwrap())
953    }
954
955    /// Provides access to the underlying reader
956    ///
957    /// Do not use this for reading data as it will bypass any I/O scheduling!
958    /// This is mainly exposed to allow metadata operations (e.g size, block_size,)
959    /// which either aren't IOPS or we don't throttle
960    pub fn reader(&self) -> &Arc<dyn Reader> {
961        &self.reader
962    }
963}
964
965#[cfg(test)]
966mod tests {
967    use std::{collections::VecDeque, time::Duration};
968
969    use futures::poll;
970    use lance_core::utils::tempfile::TempObjFile;
971    use rand::RngCore;
972
973    use object_store::{memory::InMemory, GetRange, ObjectStore as OSObjectStore};
974    use tokio::{runtime::Handle, time::timeout};
975    use url::Url;
976
977    use crate::{
978        object_store::{DEFAULT_DOWNLOAD_RETRY_COUNT, DEFAULT_MAX_IOP_SIZE},
979        testing::MockObjectStore,
980    };
981
982    use super::*;
983
984    #[tokio::test]
985    async fn test_full_seq_read() {
986        let tmp_file = TempObjFile::default();
987
988        let obj_store = Arc::new(ObjectStore::local());
989
990        // Write 1MiB of data
991        const DATA_SIZE: u64 = 1024 * 1024;
992        let mut some_data = vec![0; DATA_SIZE as usize];
993        rand::rng().fill_bytes(&mut some_data);
994        obj_store.put(&tmp_file, &some_data).await.unwrap();
995
996        let config = SchedulerConfig::default_for_testing();
997
998        let scheduler = ScanScheduler::new(obj_store, config);
999
1000        let file_scheduler = scheduler
1001            .open_file(&tmp_file, &CachedFileSize::unknown())
1002            .await
1003            .unwrap();
1004
1005        // Read it back 4KiB at a time
1006        const READ_SIZE: u64 = 4 * 1024;
1007        let mut reqs = VecDeque::new();
1008        let mut offset = 0;
1009        while offset < DATA_SIZE {
1010            reqs.push_back(
1011                #[allow(clippy::single_range_in_vec_init)]
1012                file_scheduler
1013                    .submit_request(vec![offset..offset + READ_SIZE], 0)
1014                    .await
1015                    .unwrap(),
1016            );
1017            offset += READ_SIZE;
1018        }
1019
1020        offset = 0;
1021        // Note: we should get parallel I/O even though we are consuming serially
1022        while offset < DATA_SIZE {
1023            let data = reqs.pop_front().unwrap();
1024            let actual = &data[0];
1025            let expected = &some_data[offset as usize..(offset + READ_SIZE) as usize];
1026            assert_eq!(expected, actual);
1027            offset += READ_SIZE;
1028        }
1029    }
1030
1031    #[tokio::test]
1032    async fn test_split_coalesce() {
1033        let tmp_file = TempObjFile::default();
1034
1035        let obj_store = Arc::new(ObjectStore::local());
1036
1037        // Write 75MiB of data
1038        const DATA_SIZE: u64 = 75 * 1024 * 1024;
1039        let mut some_data = vec![0; DATA_SIZE as usize];
1040        rand::rng().fill_bytes(&mut some_data);
1041        obj_store.put(&tmp_file, &some_data).await.unwrap();
1042
1043        let config = SchedulerConfig::default_for_testing();
1044
1045        let scheduler = ScanScheduler::new(obj_store, config);
1046
1047        let file_scheduler = scheduler
1048            .open_file(&tmp_file, &CachedFileSize::unknown())
1049            .await
1050            .unwrap();
1051
1052        // These 3 requests should be coalesced into a single I/O because they are within 4KiB
1053        // of each other
1054        let req =
1055            file_scheduler.submit_request(vec![50_000..51_000, 52_000..53_000, 54_000..55_000], 0);
1056
1057        let bytes = req.await.unwrap();
1058
1059        assert_eq!(bytes[0], &some_data[50_000..51_000]);
1060        assert_eq!(bytes[1], &some_data[52_000..53_000]);
1061        assert_eq!(bytes[2], &some_data[54_000..55_000]);
1062
1063        assert_eq!(1, scheduler.stats().iops);
1064
1065        // This should be split into 5 requests because it is so large
1066        let req = file_scheduler.submit_request(vec![0..DATA_SIZE], 0);
1067        let bytes = req.await.unwrap();
1068        assert!(bytes[0] == some_data, "data is not the same");
1069
1070        assert_eq!(6, scheduler.stats().iops);
1071
1072        // None of these requests are bigger than the max IOP size but they will be coalesced into
1073        // one IOP that is bigger and then split back into 2 requests that don't quite align with the original
1074        // ranges.
1075        let chunk_size = *DEFAULT_MAX_IOP_SIZE;
1076        let req = file_scheduler.submit_request(
1077            vec![
1078                10..chunk_size,
1079                chunk_size + 10..(chunk_size * 2) - 20,
1080                chunk_size * 2..(chunk_size * 2) + 10,
1081            ],
1082            0,
1083        );
1084
1085        let bytes = req.await.unwrap();
1086        let chunk_size = chunk_size as usize;
1087        assert!(
1088            bytes[0] == some_data[10..chunk_size],
1089            "data is not the same"
1090        );
1091        assert!(
1092            bytes[1] == some_data[chunk_size + 10..(chunk_size * 2) - 20],
1093            "data is not the same"
1094        );
1095        assert!(
1096            bytes[2] == some_data[chunk_size * 2..(chunk_size * 2) + 10],
1097            "data is not the same"
1098        );
1099        assert_eq!(8, scheduler.stats().iops);
1100
1101        let reads = (0..44)
1102            .map(|i| i * 1_000_000..(i + 1) * 1_000_000)
1103            .collect::<Vec<_>>();
1104        let req = file_scheduler.submit_request(reads, 0);
1105        let bytes = req.await.unwrap();
1106        for (i, bytes) in bytes.iter().enumerate() {
1107            assert!(
1108                bytes == &some_data[i * 1_000_000..(i + 1) * 1_000_000],
1109                "data is not the same"
1110            );
1111        }
1112        assert_eq!(11, scheduler.stats().iops);
1113    }
1114
1115    #[tokio::test]
1116    async fn test_priority() {
1117        let some_path = Path::parse("foo").unwrap();
1118        let base_store = Arc::new(InMemory::new());
1119        base_store
1120            .put(&some_path, vec![0; 1000].into())
1121            .await
1122            .unwrap();
1123
1124        let semaphore = Arc::new(tokio::sync::Semaphore::new(0));
1125        let mut obj_store = MockObjectStore::default();
1126        let semaphore_copy = semaphore.clone();
1127        obj_store
1128            .expect_get_opts()
1129            .returning(move |location, options| {
1130                let semaphore = semaphore.clone();
1131                let base_store = base_store.clone();
1132                let location = location.clone();
1133                async move {
1134                    semaphore.acquire().await.unwrap().forget();
1135                    base_store.get_opts(&location, options).await
1136                }
1137                .boxed()
1138            });
1139        let obj_store = Arc::new(ObjectStore::new(
1140            Arc::new(obj_store),
1141            Url::parse("mem://").unwrap(),
1142            Some(500),
1143            None,
1144            false,
1145            false,
1146            1,
1147            DEFAULT_DOWNLOAD_RETRY_COUNT,
1148            None,
1149        ));
1150
1151        let config = SchedulerConfig {
1152            io_buffer_size_bytes: 1024 * 1024,
1153        };
1154
1155        let scan_scheduler = ScanScheduler::new(obj_store, config);
1156
1157        let file_scheduler = scan_scheduler
1158            .open_file(&Path::parse("foo").unwrap(), &CachedFileSize::new(1000))
1159            .await
1160            .unwrap();
1161
1162        // Issue a request, priority doesn't matter, it will be submitted
1163        // immediately (it will go pending)
1164        // Note: the timeout is to prevent a deadlock if the test fails.
1165        let first_fut = timeout(
1166            Duration::from_secs(10),
1167            file_scheduler.submit_single(0..10, 0),
1168        )
1169        .boxed();
1170
1171        // Issue another low priority request (it will go in queue)
1172        let mut second_fut = timeout(
1173            Duration::from_secs(10),
1174            file_scheduler.submit_single(0..20, 100),
1175        )
1176        .boxed();
1177
1178        // Issue a high priority request (it will go in queue and should bump
1179        // the other queued request down)
1180        let mut third_fut = timeout(
1181            Duration::from_secs(10),
1182            file_scheduler.submit_single(0..30, 0),
1183        )
1184        .boxed();
1185
1186        // Finish one file, should be the in-flight first request
1187        semaphore_copy.add_permits(1);
1188        assert!(first_fut.await.unwrap().unwrap().len() == 10);
1189        // Other requests should not be finished
1190        assert!(poll!(&mut second_fut).is_pending());
1191        assert!(poll!(&mut third_fut).is_pending());
1192
1193        // Next should be high priority request
1194        semaphore_copy.add_permits(1);
1195        assert!(third_fut.await.unwrap().unwrap().len() == 30);
1196        assert!(poll!(&mut second_fut).is_pending());
1197
1198        // Finally, the low priority request
1199        semaphore_copy.add_permits(1);
1200        assert!(second_fut.await.unwrap().unwrap().len() == 20);
1201    }
1202
1203    #[tokio::test(flavor = "multi_thread")]
1204    async fn test_backpressure() {
1205        let some_path = Path::parse("foo").unwrap();
1206        let base_store = Arc::new(InMemory::new());
1207        base_store
1208            .put(&some_path, vec![0; 100000].into())
1209            .await
1210            .unwrap();
1211
1212        let bytes_read = Arc::new(AtomicU64::from(0));
1213        let mut obj_store = MockObjectStore::default();
1214        let bytes_read_copy = bytes_read.clone();
1215        // Wraps the obj_store to keep track of how many bytes have been read
1216        obj_store
1217            .expect_get_opts()
1218            .returning(move |location, options| {
1219                let range = options.range.as_ref().unwrap();
1220                let num_bytes = match range {
1221                    GetRange::Bounded(bounded) => bounded.end - bounded.start,
1222                    _ => panic!(),
1223                };
1224                bytes_read_copy.fetch_add(num_bytes, Ordering::Release);
1225                let location = location.clone();
1226                let base_store = base_store.clone();
1227                async move { base_store.get_opts(&location, options).await }.boxed()
1228            });
1229        let obj_store = Arc::new(ObjectStore::new(
1230            Arc::new(obj_store),
1231            Url::parse("mem://").unwrap(),
1232            Some(500),
1233            None,
1234            false,
1235            false,
1236            1,
1237            DEFAULT_DOWNLOAD_RETRY_COUNT,
1238            None,
1239        ));
1240
1241        let config = SchedulerConfig {
1242            io_buffer_size_bytes: 10,
1243        };
1244
1245        let scan_scheduler = ScanScheduler::new(obj_store.clone(), config);
1246
1247        let file_scheduler = scan_scheduler
1248            .open_file(&Path::parse("foo").unwrap(), &CachedFileSize::new(100000))
1249            .await
1250            .unwrap();
1251
1252        let wait_for_idle = || async move {
1253            let handle = Handle::current();
1254            while handle.metrics().num_alive_tasks() != 1 {
1255                tokio::time::sleep(Duration::from_millis(10)).await;
1256            }
1257        };
1258        let wait_for_bytes_read_and_idle = |target_bytes: u64| {
1259            // We need to move `target` but don't want to move `bytes_read`
1260            let bytes_read = &bytes_read;
1261            async move {
1262                let bytes_read_copy = bytes_read.clone();
1263                while bytes_read_copy.load(Ordering::Acquire) < target_bytes {
1264                    tokio::time::sleep(Duration::from_millis(10)).await;
1265                }
1266                wait_for_idle().await;
1267            }
1268        };
1269
1270        // This read will begin immediately
1271        let first_fut = file_scheduler.submit_single(0..5, 0);
1272        // This read should also begin immediately
1273        let second_fut = file_scheduler.submit_single(0..5, 0);
1274        // This read will be throttled
1275        let third_fut = file_scheduler.submit_single(0..3, 0);
1276        // Two tasks (third_fut and unit test)
1277        wait_for_bytes_read_and_idle(10).await;
1278
1279        assert_eq!(first_fut.await.unwrap().len(), 5);
1280        // One task (unit test)
1281        wait_for_bytes_read_and_idle(13).await;
1282
1283        // 2 bytes are ready but 5 bytes requested, read will be blocked
1284        let fourth_fut = file_scheduler.submit_single(0..5, 0);
1285        wait_for_bytes_read_and_idle(13).await;
1286
1287        // Out of order completion is ok, will unblock backpressure
1288        assert_eq!(third_fut.await.unwrap().len(), 3);
1289        wait_for_bytes_read_and_idle(18).await;
1290
1291        assert_eq!(second_fut.await.unwrap().len(), 5);
1292        // At this point there are 5 bytes available in backpressure queue
1293        // Now we issue multi-read that can be partially fulfilled, it will read some bytes but
1294        // not all of them. (using large range gap to ensure request not coalesced)
1295        //
1296        // I'm actually not sure this behavior is great.  It's possible that we should just
1297        // block until we can fulfill the entire request.
1298        let fifth_fut = file_scheduler.submit_request(vec![0..3, 90000..90007], 0);
1299        wait_for_bytes_read_and_idle(21).await;
1300
1301        // Fifth future should eventually finish due to deadlock prevention
1302        let fifth_bytes = tokio::time::timeout(Duration::from_secs(10), fifth_fut)
1303            .await
1304            .unwrap();
1305        assert_eq!(
1306            fifth_bytes.unwrap().iter().map(|b| b.len()).sum::<usize>(),
1307            10
1308        );
1309
1310        // And now let's just make sure that we can read the rest of the data
1311        assert_eq!(fourth_fut.await.unwrap().len(), 5);
1312        wait_for_bytes_read_and_idle(28).await;
1313
1314        // Ensure deadlock prevention timeout can be disabled
1315        let config = SchedulerConfig {
1316            io_buffer_size_bytes: 10,
1317        };
1318
1319        let scan_scheduler = ScanScheduler::new(obj_store, config);
1320        let file_scheduler = scan_scheduler
1321            .open_file(&Path::parse("foo").unwrap(), &CachedFileSize::new(100000))
1322            .await
1323            .unwrap();
1324
1325        let first_fut = file_scheduler.submit_single(0..10, 0);
1326        let second_fut = file_scheduler.submit_single(0..10, 0);
1327
1328        std::thread::sleep(Duration::from_millis(100));
1329        assert_eq!(first_fut.await.unwrap().len(), 10);
1330        assert_eq!(second_fut.await.unwrap().len(), 10);
1331    }
1332
1333    #[test_log::test(tokio::test(flavor = "multi_thread"))]
1334    async fn stress_backpressure() {
1335        // This test ensures that the backpressure mechanism works correctly with
1336        // regards to priority.  In other words, as long as all requests are consumed
1337        // in priority order then the backpressure mechanism should not deadlock
1338        let some_path = Path::parse("foo").unwrap();
1339        let obj_store = Arc::new(ObjectStore::memory());
1340        obj_store
1341            .put(&some_path, vec![0; 100000].as_slice())
1342            .await
1343            .unwrap();
1344
1345        // Only one request will be allowed in
1346        let config = SchedulerConfig {
1347            io_buffer_size_bytes: 1,
1348        };
1349        let scan_scheduler = ScanScheduler::new(obj_store.clone(), config);
1350        let file_scheduler = scan_scheduler
1351            .open_file(&some_path, &CachedFileSize::unknown())
1352            .await
1353            .unwrap();
1354
1355        let mut futs = Vec::with_capacity(10000);
1356        for idx in 0..10000 {
1357            futs.push(file_scheduler.submit_single(idx..idx + 1, idx));
1358        }
1359
1360        for fut in futs {
1361            fut.await.unwrap();
1362        }
1363    }
1364}