1use bytes::Bytes;
5use futures::channel::oneshot;
6use futures::{FutureExt, TryFutureExt};
7use object_store::path::Path;
8use snafu::location;
9use std::collections::BinaryHeap;
10use std::fmt::Debug;
11use std::future::Future;
12use std::num::NonZero;
13use std::ops::Range;
14use std::sync::atomic::{AtomicU64, Ordering};
15use std::sync::{Arc, Mutex};
16use std::time::Instant;
17use tokio::sync::{Notify, Semaphore, SemaphorePermit};
18
19use lance_core::{Error, Result};
20
21use crate::object_store::ObjectStore;
22use crate::traits::Reader;
23use crate::utils::CachedFileSize;
24
25const BACKPRESSURE_MIN: u64 = 5;
27const BACKPRESSURE_DEBOUNCE: u64 = 60;
29
30static IOPS_COUNTER: AtomicU64 = AtomicU64::new(0);
32static BYTES_READ_COUNTER: AtomicU64 = AtomicU64::new(0);
34static DEFAULT_PROCESS_IOPS_LIMIT: i32 = 128;
47
48pub fn iops_counter() -> u64 {
49 IOPS_COUNTER.load(Ordering::Acquire)
50}
51
52pub fn bytes_read_counter() -> u64 {
53 BYTES_READ_COUNTER.load(Ordering::Acquire)
54}
55
56struct IopsQuota {
83 iops_avail: Option<Semaphore>,
85}
86
87struct IopsReservation<'a> {
92 value: Option<SemaphorePermit<'a>>,
93}
94
95impl IopsReservation<'_> {
96 fn forget(&mut self) {
98 if let Some(value) = self.value.take() {
99 value.forget();
100 }
101 }
102}
103
104impl IopsQuota {
105 fn new() -> Self {
110 let initial_capacity = std::env::var("LANCE_PROCESS_IO_THREADS_LIMIT")
111 .map(|s| {
112 s.parse::<i32>().unwrap_or_else(|_| {
113 log::warn!("Ignoring invalid LANCE_PROCESS_IO_THREADS_LIMIT: {}", s);
114 DEFAULT_PROCESS_IOPS_LIMIT
115 })
116 })
117 .unwrap_or(DEFAULT_PROCESS_IOPS_LIMIT);
118 let iops_avail = if initial_capacity <= 0 {
119 None
120 } else {
121 Some(Semaphore::new(initial_capacity as usize))
122 };
123 Self { iops_avail }
124 }
125
126 fn release(&self) {
128 if let Some(iops_avail) = self.iops_avail.as_ref() {
129 iops_avail.add_permits(1);
130 }
131 }
132
133 async fn acquire(&self) -> IopsReservation<'_> {
135 if let Some(iops_avail) = self.iops_avail.as_ref() {
136 IopsReservation {
137 value: Some(iops_avail.acquire().await.unwrap()),
138 }
139 } else {
140 IopsReservation { value: None }
141 }
142 }
143}
144
145static IOPS_QUOTA: std::sync::LazyLock<IopsQuota> = std::sync::LazyLock::new(IopsQuota::new);
146
147struct PrioritiesInFlight {
156 in_flight: Vec<u128>,
157}
158
159impl PrioritiesInFlight {
160 fn new(capacity: u32) -> Self {
161 Self {
162 in_flight: Vec::with_capacity(capacity as usize * 2),
163 }
164 }
165
166 fn min_in_flight(&self) -> u128 {
167 self.in_flight.first().copied().unwrap_or(u128::MAX)
168 }
169
170 fn push(&mut self, prio: u128) {
171 let pos = match self.in_flight.binary_search(&prio) {
172 Ok(pos) => pos,
173 Err(pos) => pos,
174 };
175 self.in_flight.insert(pos, prio);
176 }
177
178 fn remove(&mut self, prio: u128) {
179 if let Ok(pos) = self.in_flight.binary_search(&prio) {
180 self.in_flight.remove(pos);
181 } else {
182 unreachable!();
183 }
184 }
185}
186
187struct IoQueueState {
188 iops_avail: u32,
190 bytes_avail: i64,
194 pending_requests: BinaryHeap<IoTask>,
196 priorities_in_flight: PrioritiesInFlight,
198 done_scheduling: bool,
201 start: Instant,
203 last_warn: AtomicU64,
205}
206
207impl IoQueueState {
208 fn new(io_capacity: u32, io_buffer_size: u64) -> Self {
209 Self {
210 iops_avail: io_capacity,
211 bytes_avail: io_buffer_size as i64,
212 pending_requests: BinaryHeap::new(),
213 priorities_in_flight: PrioritiesInFlight::new(io_capacity),
214 done_scheduling: false,
215 start: Instant::now(),
216 last_warn: AtomicU64::from(0),
217 }
218 }
219
220 fn finished(&self) -> bool {
221 self.done_scheduling && self.pending_requests.is_empty()
222 }
223
224 fn warn_if_needed(&self) {
225 let seconds_elapsed = self.start.elapsed().as_secs();
226 let last_warn = self.last_warn.load(Ordering::Acquire);
227 let since_last_warn = seconds_elapsed - last_warn;
228 if (last_warn == 0
229 && seconds_elapsed > BACKPRESSURE_MIN
230 && seconds_elapsed < BACKPRESSURE_DEBOUNCE)
231 || since_last_warn > BACKPRESSURE_DEBOUNCE
232 {
233 tracing::event!(tracing::Level::DEBUG, "Backpressure throttle exceeded");
234 log::debug!("Backpressure throttle is full, I/O will pause until buffer is drained. Max I/O bandwidth will not be achieved because CPU is falling behind");
235 self.last_warn
236 .store(seconds_elapsed.max(1), Ordering::Release);
237 }
238 }
239
240 fn can_deliver(&self, task: &IoTask) -> bool {
241 if self.iops_avail == 0 {
242 false
243 } else if task.priority <= self.priorities_in_flight.min_in_flight() {
244 true
245 } else if task.num_bytes() as i64 > self.bytes_avail {
246 self.warn_if_needed();
247 false
248 } else {
249 true
250 }
251 }
252
253 fn next_task(&mut self) -> Option<IoTask> {
254 let task = self.pending_requests.peek()?;
255 if self.can_deliver(task) {
256 self.priorities_in_flight.push(task.priority);
257 self.iops_avail -= 1;
258 self.bytes_avail -= task.num_bytes() as i64;
259 if self.bytes_avail < 0 {
260 log::debug!(
262 "Backpressure throttle temporarily exceeded by {} bytes due to priority I/O",
263 -self.bytes_avail
264 );
265 }
266 Some(self.pending_requests.pop().unwrap())
267 } else {
268 None
269 }
270 }
271}
272
273struct IoQueue {
278 state: Mutex<IoQueueState>,
280 notify: Notify,
282}
283
284impl IoQueue {
285 fn new(io_capacity: u32, io_buffer_size: u64) -> Self {
286 Self {
287 state: Mutex::new(IoQueueState::new(io_capacity, io_buffer_size)),
288 notify: Notify::new(),
289 }
290 }
291
292 fn push(&self, task: IoTask) {
293 log::trace!(
294 "Inserting I/O request for {} bytes with priority ({},{}) into I/O queue",
295 task.num_bytes(),
296 task.priority >> 64,
297 task.priority & 0xFFFFFFFFFFFFFFFF
298 );
299 let mut state = self.state.lock().unwrap();
300 state.pending_requests.push(task);
301 drop(state);
302
303 self.notify.notify_one();
304 }
305
306 async fn pop(&self) -> Option<IoTask> {
307 loop {
308 {
309 let mut iop_res = IOPS_QUOTA.acquire().await;
314 let mut state = self.state.lock().unwrap();
316 if let Some(task) = state.next_task() {
317 iop_res.forget();
320 return Some(task);
321 }
322
323 if state.finished() {
324 return None;
325 }
326 }
327
328 self.notify.notified().await;
329 }
330 }
331
332 fn on_iop_complete(&self) {
333 let mut state = self.state.lock().unwrap();
334 state.iops_avail += 1;
335 drop(state);
336
337 self.notify.notify_one();
338 }
339
340 fn on_bytes_consumed(&self, bytes: u64, priority: u128, num_reqs: usize) {
341 let mut state = self.state.lock().unwrap();
342 state.bytes_avail += bytes as i64;
343 for _ in 0..num_reqs {
344 state.priorities_in_flight.remove(priority);
345 }
346 drop(state);
347
348 self.notify.notify_one();
349 }
350
351 fn close(&self) {
352 let mut state = self.state.lock().unwrap();
353 state.done_scheduling = true;
354 drop(state);
355
356 self.notify.notify_one();
357 }
358}
359
360struct MutableBatch<F: FnOnce(Response) + Send> {
365 when_done: Option<F>,
366 data_buffers: Vec<Bytes>,
367 num_bytes: u64,
368 priority: u128,
369 num_reqs: usize,
370 err: Option<Box<dyn std::error::Error + Send + Sync + 'static>>,
371}
372
373impl<F: FnOnce(Response) + Send> MutableBatch<F> {
374 fn new(when_done: F, num_data_buffers: u32, priority: u128, num_reqs: usize) -> Self {
375 Self {
376 when_done: Some(when_done),
377 data_buffers: vec![Bytes::default(); num_data_buffers as usize],
378 num_bytes: 0,
379 priority,
380 num_reqs,
381 err: None,
382 }
383 }
384}
385
386impl<F: FnOnce(Response) + Send> Drop for MutableBatch<F> {
391 fn drop(&mut self) {
392 let result = if self.err.is_some() {
394 Err(Error::Wrapped {
395 error: self.err.take().unwrap(),
396 location: location!(),
397 })
398 } else {
399 let mut data = Vec::new();
400 std::mem::swap(&mut data, &mut self.data_buffers);
401 Ok(data)
402 };
403 let response = Response {
406 data: result,
407 num_bytes: self.num_bytes,
408 priority: self.priority,
409 num_reqs: self.num_reqs,
410 };
411 (self.when_done.take().unwrap())(response);
412 }
413}
414
415struct DataChunk {
416 task_idx: usize,
417 num_bytes: u64,
418 data: Result<Bytes>,
419}
420
421trait DataSink: Send {
422 fn deliver_data(&mut self, data: DataChunk);
423}
424
425impl<F: FnOnce(Response) + Send> DataSink for MutableBatch<F> {
426 fn deliver_data(&mut self, data: DataChunk) {
428 self.num_bytes += data.num_bytes;
429 match data.data {
430 Ok(data_bytes) => {
431 self.data_buffers[data.task_idx] = data_bytes;
432 }
433 Err(err) => {
434 self.err.get_or_insert(Box::new(err));
436 }
437 }
438 }
439}
440
441struct IoTask {
442 reader: Arc<dyn Reader>,
443 to_read: Range<u64>,
444 when_done: Box<dyn FnOnce(Result<Bytes>) + Send>,
445 priority: u128,
446}
447
448impl Eq for IoTask {}
449
450impl PartialEq for IoTask {
451 fn eq(&self, other: &Self) -> bool {
452 self.priority == other.priority
453 }
454}
455
456impl PartialOrd for IoTask {
457 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
458 Some(self.cmp(other))
459 }
460}
461
462impl Ord for IoTask {
463 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
464 other.priority.cmp(&self.priority)
466 }
467}
468
469impl IoTask {
470 fn num_bytes(&self) -> u64 {
471 self.to_read.end - self.to_read.start
472 }
473
474 async fn run(self) {
475 let file_path = self.reader.path().as_ref();
476 let num_bytes = self.num_bytes();
477 let bytes = if self.to_read.start == self.to_read.end {
478 Ok(Bytes::new())
479 } else {
480 let bytes_fut = self
481 .reader
482 .get_range(self.to_read.start as usize..self.to_read.end as usize);
483 IOPS_COUNTER.fetch_add(1, Ordering::Release);
484 let num_bytes = self.num_bytes();
485 bytes_fut
486 .inspect(move |_| {
487 BYTES_READ_COUNTER.fetch_add(num_bytes, Ordering::Release);
488 })
489 .await
490 .map_err(Error::from)
491 };
492 tracing::trace!(
494 file = file_path,
495 bytes_read = num_bytes,
496 requests = 1,
497 range_start = self.to_read.start,
498 range_end = self.to_read.end,
499 "File I/O completed"
500 );
501 IOPS_QUOTA.release();
502 (self.when_done)(bytes);
503 }
504}
505
506async fn run_io_loop(tasks: Arc<IoQueue>) {
509 loop {
512 let next_task = tasks.pop().await;
513 match next_task {
514 Some(task) => {
515 tokio::spawn(task.run());
516 }
517 None => {
518 return;
520 }
521 }
522 }
523}
524
525#[derive(Debug)]
526struct StatsCollector {
527 iops: AtomicU64,
528 requests: AtomicU64,
529 bytes_read: AtomicU64,
530}
531
532impl StatsCollector {
533 fn new() -> Self {
534 Self {
535 iops: AtomicU64::new(0),
536 requests: AtomicU64::new(0),
537 bytes_read: AtomicU64::new(0),
538 }
539 }
540
541 fn iops(&self) -> u64 {
542 self.iops.load(Ordering::Relaxed)
543 }
544
545 fn bytes_read(&self) -> u64 {
546 self.bytes_read.load(Ordering::Relaxed)
547 }
548
549 fn requests(&self) -> u64 {
550 self.requests.load(Ordering::Relaxed)
551 }
552
553 fn record_request(&self, request: &[Range<u64>]) {
554 self.requests.fetch_add(1, Ordering::Relaxed);
555 self.iops.fetch_add(request.len() as u64, Ordering::Relaxed);
556 self.bytes_read.fetch_add(
557 request.iter().map(|r| r.end - r.start).sum::<u64>(),
558 Ordering::Relaxed,
559 );
560 }
561}
562
563pub struct ScanStats {
564 pub iops: u64,
565 pub requests: u64,
566 pub bytes_read: u64,
567}
568
569impl ScanStats {
570 fn new(stats: &StatsCollector) -> Self {
571 Self {
572 iops: stats.iops(),
573 requests: stats.requests(),
574 bytes_read: stats.bytes_read(),
575 }
576 }
577}
578
579pub struct ScanScheduler {
584 object_store: Arc<ObjectStore>,
585 io_queue: Arc<IoQueue>,
586 stats: Arc<StatsCollector>,
587}
588
589impl Debug for ScanScheduler {
590 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
591 f.debug_struct("ScanScheduler")
592 .field("object_store", &self.object_store)
593 .finish()
594 }
595}
596
597struct Response {
598 data: Result<Vec<Bytes>>,
599 priority: u128,
600 num_reqs: usize,
601 num_bytes: u64,
602}
603
604#[derive(Debug, Clone, Copy)]
605pub struct SchedulerConfig {
606 pub io_buffer_size_bytes: u64,
610}
611
612impl SchedulerConfig {
613 pub fn default_for_testing() -> Self {
615 Self {
616 io_buffer_size_bytes: 256 * 1024 * 1024,
617 }
618 }
619
620 pub fn max_bandwidth(store: &ObjectStore) -> Self {
623 Self {
624 io_buffer_size_bytes: 32 * 1024 * 1024 * store.io_parallelism() as u64,
625 }
626 }
627}
628
629impl ScanScheduler {
630 pub fn new(object_store: Arc<ObjectStore>, config: SchedulerConfig) -> Arc<Self> {
637 let io_capacity = object_store.io_parallelism();
638 let io_queue = Arc::new(IoQueue::new(
639 io_capacity as u32,
640 config.io_buffer_size_bytes,
641 ));
642 let scheduler = Self {
643 object_store,
644 io_queue: io_queue.clone(),
645 stats: Arc::new(StatsCollector::new()),
646 };
647 tokio::task::spawn(async move { run_io_loop(io_queue).await });
648 Arc::new(scheduler)
649 }
650
651 pub async fn open_file_with_priority(
660 self: &Arc<Self>,
661 path: &Path,
662 base_priority: u64,
663 file_size_bytes: &CachedFileSize,
664 ) -> Result<FileScheduler> {
665 let file_size_bytes = if let Some(size) = file_size_bytes.get() {
666 u64::from(size)
667 } else {
668 let size = self.object_store.size(path).await?;
669 if let Some(size) = NonZero::new(size) {
670 file_size_bytes.set(size);
671 }
672 size
673 };
674 let reader = self
675 .object_store
676 .open_with_size(path, file_size_bytes as usize)
677 .await?;
678 let block_size = self.object_store.block_size() as u64;
679 let max_iop_size = self.object_store.max_iop_size();
680 Ok(FileScheduler {
681 reader: reader.into(),
682 block_size,
683 root: self.clone(),
684 base_priority,
685 max_iop_size,
686 })
687 }
688
689 pub async fn open_file(
693 self: &Arc<Self>,
694 path: &Path,
695 file_size_bytes: &CachedFileSize,
696 ) -> Result<FileScheduler> {
697 self.open_file_with_priority(path, 0, file_size_bytes).await
698 }
699
700 fn do_submit_request(
701 &self,
702 reader: Arc<dyn Reader>,
703 request: Vec<Range<u64>>,
704 tx: oneshot::Sender<Response>,
705 priority: u128,
706 ) {
707 let num_iops = request.len() as u32;
708
709 let when_all_io_done = move |bytes_and_permits| {
710 let _ = tx.send(bytes_and_permits);
712 };
713
714 let dest = Arc::new(Mutex::new(Box::new(MutableBatch::new(
715 when_all_io_done,
716 num_iops,
717 priority,
718 request.len(),
719 ))));
720
721 for (task_idx, iop) in request.into_iter().enumerate() {
722 let dest = dest.clone();
723 let io_queue = self.io_queue.clone();
724 let num_bytes = iop.end - iop.start;
725 let task = IoTask {
726 reader: reader.clone(),
727 to_read: iop,
728 priority,
729 when_done: Box::new(move |data| {
730 io_queue.on_iop_complete();
731 let mut dest = dest.lock().unwrap();
732 let chunk = DataChunk {
733 data,
734 task_idx,
735 num_bytes,
736 };
737 dest.deliver_data(chunk);
738 }),
739 };
740 self.io_queue.push(task);
741 }
742 }
743
744 fn submit_request(
745 &self,
746 reader: Arc<dyn Reader>,
747 request: Vec<Range<u64>>,
748 priority: u128,
749 ) -> impl Future<Output = Result<Vec<Bytes>>> + Send {
750 let (tx, rx) = oneshot::channel::<Response>();
751
752 self.do_submit_request(reader, request, tx, priority);
753
754 let io_queue = self.io_queue.clone();
755
756 rx.map(move |wrapped_rsp| {
757 let rsp = wrapped_rsp.unwrap();
760 io_queue.on_bytes_consumed(rsp.num_bytes, rsp.priority, rsp.num_reqs);
761 rsp.data
762 })
763 }
764
765 pub fn stats(&self) -> ScanStats {
766 ScanStats::new(self.stats.as_ref())
767 }
768}
769
770impl Drop for ScanScheduler {
771 fn drop(&mut self) {
772 self.io_queue.close();
773 }
774}
775
776#[derive(Clone, Debug)]
778pub struct FileScheduler {
779 reader: Arc<dyn Reader>,
780 root: Arc<ScanScheduler>,
781 block_size: u64,
782 base_priority: u64,
783 max_iop_size: u64,
784}
785
786fn is_close_together(range1: &Range<u64>, range2: &Range<u64>, block_size: u64) -> bool {
787 range2.start <= (range1.end + block_size)
789}
790
791fn is_overlapping(range1: &Range<u64>, range2: &Range<u64>) -> bool {
792 range1.start < range2.end && range2.start < range1.end
793}
794
795impl FileScheduler {
796 pub fn submit_request(
809 &self,
810 request: Vec<Range<u64>>,
811 priority: u64,
812 ) -> impl Future<Output = Result<Vec<Bytes>>> + Send {
813 let priority = ((self.base_priority as u128) << 64) + priority as u128;
815
816 let mut merged_requests = Vec::with_capacity(request.len());
817
818 if !request.is_empty() {
819 let mut curr_interval = request[0].clone();
820
821 for req in request.iter().skip(1) {
822 if is_close_together(&curr_interval, req, self.block_size) {
823 curr_interval.end = curr_interval.end.max(req.end);
824 } else {
825 merged_requests.push(curr_interval);
826 curr_interval = req.clone();
827 }
828 }
829
830 merged_requests.push(curr_interval);
831 }
832
833 let mut updated_requests = Vec::with_capacity(merged_requests.len());
834 for req in merged_requests {
835 if req.is_empty() {
836 updated_requests.push(req);
837 } else {
838 let num_requests = (req.end - req.start).div_ceil(self.max_iop_size);
839 let bytes_per_request = (req.end - req.start) / num_requests;
840 for i in 0..num_requests {
841 let start = req.start + i * bytes_per_request;
842 let end = if i == num_requests - 1 {
843 req.end
845 } else {
846 start + bytes_per_request
847 };
848 updated_requests.push(start..end);
849 }
850 }
851 }
852
853 self.root.stats.record_request(&updated_requests);
854
855 let bytes_vec_fut =
856 self.root
857 .submit_request(self.reader.clone(), updated_requests.clone(), priority);
858
859 let mut updated_index = 0;
860 let mut final_bytes = Vec::with_capacity(request.len());
861
862 async move {
863 let bytes_vec = bytes_vec_fut.await?;
864
865 let mut orig_index = 0;
866 while (updated_index < updated_requests.len()) && (orig_index < request.len()) {
867 let updated_range = &updated_requests[updated_index];
868 let orig_range = &request[orig_index];
869 let byte_offset = updated_range.start as usize;
870
871 if is_overlapping(updated_range, orig_range) {
872 let start = orig_range.start as usize - byte_offset;
874 if orig_range.end <= updated_range.end {
875 let end = orig_range.end as usize - byte_offset;
878 final_bytes.push(bytes_vec[updated_index].slice(start..end));
879 } else {
880 let orig_size = orig_range.end - orig_range.start;
883 let mut merged_bytes = Vec::with_capacity(orig_size as usize);
884 merged_bytes.extend_from_slice(&bytes_vec[updated_index].slice(start..));
885 let mut copy_offset = merged_bytes.len() as u64;
886 while copy_offset < orig_size {
887 updated_index += 1;
888 let next_range = &updated_requests[updated_index];
889 let bytes_to_take =
890 (orig_size - copy_offset).min(next_range.end - next_range.start);
891 merged_bytes.extend_from_slice(
892 &bytes_vec[updated_index].slice(0..bytes_to_take as usize),
893 );
894 copy_offset += bytes_to_take;
895 }
896 final_bytes.push(Bytes::from(merged_bytes));
897 }
898 orig_index += 1;
899 } else {
900 updated_index += 1;
901 }
902 }
903
904 Ok(final_bytes)
905 }
906 }
907
908 pub fn with_priority(&self, priority: u64) -> Self {
909 Self {
910 reader: self.reader.clone(),
911 root: self.root.clone(),
912 block_size: self.block_size,
913 max_iop_size: self.max_iop_size,
914 base_priority: priority,
915 }
916 }
917
918 pub fn submit_single(
925 &self,
926 range: Range<u64>,
927 priority: u64,
928 ) -> impl Future<Output = Result<Bytes>> + Send {
929 self.submit_request(vec![range], priority)
930 .map_ok(|vec_bytes| vec_bytes.into_iter().next().unwrap())
931 }
932
933 pub fn reader(&self) -> &Arc<dyn Reader> {
939 &self.reader
940 }
941}
942
943#[cfg(test)]
944mod tests {
945 use std::{collections::VecDeque, time::Duration};
946
947 use futures::poll;
948 use lance_core::utils::tempfile::TempObjFile;
949 use rand::RngCore;
950
951 use object_store::{memory::InMemory, GetRange, ObjectStore as OSObjectStore};
952 use tokio::{runtime::Handle, time::timeout};
953 use url::Url;
954
955 use crate::{
956 object_store::{DEFAULT_DOWNLOAD_RETRY_COUNT, DEFAULT_MAX_IOP_SIZE},
957 testing::MockObjectStore,
958 };
959
960 use super::*;
961
962 #[tokio::test]
963 async fn test_full_seq_read() {
964 let tmp_file = TempObjFile::default();
965
966 let obj_store = Arc::new(ObjectStore::local());
967
968 const DATA_SIZE: u64 = 1024 * 1024;
970 let mut some_data = vec![0; DATA_SIZE as usize];
971 rand::rng().fill_bytes(&mut some_data);
972 obj_store.put(&tmp_file, &some_data).await.unwrap();
973
974 let config = SchedulerConfig::default_for_testing();
975
976 let scheduler = ScanScheduler::new(obj_store, config);
977
978 let file_scheduler = scheduler
979 .open_file(&tmp_file, &CachedFileSize::unknown())
980 .await
981 .unwrap();
982
983 const READ_SIZE: u64 = 4 * 1024;
985 let mut reqs = VecDeque::new();
986 let mut offset = 0;
987 while offset < DATA_SIZE {
988 reqs.push_back(
989 #[allow(clippy::single_range_in_vec_init)]
990 file_scheduler
991 .submit_request(vec![offset..offset + READ_SIZE], 0)
992 .await
993 .unwrap(),
994 );
995 offset += READ_SIZE;
996 }
997
998 offset = 0;
999 while offset < DATA_SIZE {
1001 let data = reqs.pop_front().unwrap();
1002 let actual = &data[0];
1003 let expected = &some_data[offset as usize..(offset + READ_SIZE) as usize];
1004 assert_eq!(expected, actual);
1005 offset += READ_SIZE;
1006 }
1007 }
1008
1009 #[tokio::test]
1010 async fn test_split_coalesce() {
1011 let tmp_file = TempObjFile::default();
1012
1013 let obj_store = Arc::new(ObjectStore::local());
1014
1015 const DATA_SIZE: u64 = 75 * 1024 * 1024;
1017 let mut some_data = vec![0; DATA_SIZE as usize];
1018 rand::rng().fill_bytes(&mut some_data);
1019 obj_store.put(&tmp_file, &some_data).await.unwrap();
1020
1021 let config = SchedulerConfig::default_for_testing();
1022
1023 let scheduler = ScanScheduler::new(obj_store, config);
1024
1025 let file_scheduler = scheduler
1026 .open_file(&tmp_file, &CachedFileSize::unknown())
1027 .await
1028 .unwrap();
1029
1030 let req =
1033 file_scheduler.submit_request(vec![50_000..51_000, 52_000..53_000, 54_000..55_000], 0);
1034
1035 let bytes = req.await.unwrap();
1036
1037 assert_eq!(bytes[0], &some_data[50_000..51_000]);
1038 assert_eq!(bytes[1], &some_data[52_000..53_000]);
1039 assert_eq!(bytes[2], &some_data[54_000..55_000]);
1040
1041 assert_eq!(1, scheduler.stats().iops);
1042
1043 let req = file_scheduler.submit_request(vec![0..DATA_SIZE], 0);
1045 let bytes = req.await.unwrap();
1046 assert!(bytes[0] == some_data, "data is not the same");
1047
1048 assert_eq!(6, scheduler.stats().iops);
1049
1050 let chunk_size = *DEFAULT_MAX_IOP_SIZE;
1054 let req = file_scheduler.submit_request(
1055 vec![
1056 10..chunk_size,
1057 chunk_size + 10..(chunk_size * 2) - 20,
1058 chunk_size * 2..(chunk_size * 2) + 10,
1059 ],
1060 0,
1061 );
1062
1063 let bytes = req.await.unwrap();
1064 let chunk_size = chunk_size as usize;
1065 assert!(
1066 bytes[0] == some_data[10..chunk_size],
1067 "data is not the same"
1068 );
1069 assert!(
1070 bytes[1] == some_data[chunk_size + 10..(chunk_size * 2) - 20],
1071 "data is not the same"
1072 );
1073 assert!(
1074 bytes[2] == some_data[chunk_size * 2..(chunk_size * 2) + 10],
1075 "data is not the same"
1076 );
1077 assert_eq!(8, scheduler.stats().iops);
1078
1079 let reads = (0..44)
1080 .map(|i| i * 1_000_000..(i + 1) * 1_000_000)
1081 .collect::<Vec<_>>();
1082 let req = file_scheduler.submit_request(reads, 0);
1083 let bytes = req.await.unwrap();
1084 for (i, bytes) in bytes.iter().enumerate() {
1085 assert!(
1086 bytes == &some_data[i * 1_000_000..(i + 1) * 1_000_000],
1087 "data is not the same"
1088 );
1089 }
1090 assert_eq!(11, scheduler.stats().iops);
1091 }
1092
1093 #[tokio::test]
1094 async fn test_priority() {
1095 let some_path = Path::parse("foo").unwrap();
1096 let base_store = Arc::new(InMemory::new());
1097 base_store
1098 .put(&some_path, vec![0; 1000].into())
1099 .await
1100 .unwrap();
1101
1102 let semaphore = Arc::new(tokio::sync::Semaphore::new(0));
1103 let mut obj_store = MockObjectStore::default();
1104 let semaphore_copy = semaphore.clone();
1105 obj_store
1106 .expect_get_opts()
1107 .returning(move |location, options| {
1108 let semaphore = semaphore.clone();
1109 let base_store = base_store.clone();
1110 let location = location.clone();
1111 async move {
1112 semaphore.acquire().await.unwrap().forget();
1113 base_store.get_opts(&location, options).await
1114 }
1115 .boxed()
1116 });
1117 let obj_store = Arc::new(ObjectStore::new(
1118 Arc::new(obj_store),
1119 Url::parse("mem://").unwrap(),
1120 Some(500),
1121 None,
1122 false,
1123 false,
1124 1,
1125 DEFAULT_DOWNLOAD_RETRY_COUNT,
1126 None,
1127 ));
1128
1129 let config = SchedulerConfig {
1130 io_buffer_size_bytes: 1024 * 1024,
1131 };
1132
1133 let scan_scheduler = ScanScheduler::new(obj_store, config);
1134
1135 let file_scheduler = scan_scheduler
1136 .open_file(&Path::parse("foo").unwrap(), &CachedFileSize::new(1000))
1137 .await
1138 .unwrap();
1139
1140 let first_fut = timeout(
1144 Duration::from_secs(10),
1145 file_scheduler.submit_single(0..10, 0),
1146 )
1147 .boxed();
1148
1149 let mut second_fut = timeout(
1151 Duration::from_secs(10),
1152 file_scheduler.submit_single(0..20, 100),
1153 )
1154 .boxed();
1155
1156 let mut third_fut = timeout(
1159 Duration::from_secs(10),
1160 file_scheduler.submit_single(0..30, 0),
1161 )
1162 .boxed();
1163
1164 semaphore_copy.add_permits(1);
1166 assert!(first_fut.await.unwrap().unwrap().len() == 10);
1167 assert!(poll!(&mut second_fut).is_pending());
1169 assert!(poll!(&mut third_fut).is_pending());
1170
1171 semaphore_copy.add_permits(1);
1173 assert!(third_fut.await.unwrap().unwrap().len() == 30);
1174 assert!(poll!(&mut second_fut).is_pending());
1175
1176 semaphore_copy.add_permits(1);
1178 assert!(second_fut.await.unwrap().unwrap().len() == 20);
1179 }
1180
1181 #[tokio::test(flavor = "multi_thread")]
1182 async fn test_backpressure() {
1183 let some_path = Path::parse("foo").unwrap();
1184 let base_store = Arc::new(InMemory::new());
1185 base_store
1186 .put(&some_path, vec![0; 100000].into())
1187 .await
1188 .unwrap();
1189
1190 let bytes_read = Arc::new(AtomicU64::from(0));
1191 let mut obj_store = MockObjectStore::default();
1192 let bytes_read_copy = bytes_read.clone();
1193 obj_store
1195 .expect_get_opts()
1196 .returning(move |location, options| {
1197 let range = options.range.as_ref().unwrap();
1198 let num_bytes = match range {
1199 GetRange::Bounded(bounded) => bounded.end - bounded.start,
1200 _ => panic!(),
1201 };
1202 bytes_read_copy.fetch_add(num_bytes, Ordering::Release);
1203 let location = location.clone();
1204 let base_store = base_store.clone();
1205 async move { base_store.get_opts(&location, options).await }.boxed()
1206 });
1207 let obj_store = Arc::new(ObjectStore::new(
1208 Arc::new(obj_store),
1209 Url::parse("mem://").unwrap(),
1210 Some(500),
1211 None,
1212 false,
1213 false,
1214 1,
1215 DEFAULT_DOWNLOAD_RETRY_COUNT,
1216 None,
1217 ));
1218
1219 let config = SchedulerConfig {
1220 io_buffer_size_bytes: 10,
1221 };
1222
1223 let scan_scheduler = ScanScheduler::new(obj_store.clone(), config);
1224
1225 let file_scheduler = scan_scheduler
1226 .open_file(&Path::parse("foo").unwrap(), &CachedFileSize::new(100000))
1227 .await
1228 .unwrap();
1229
1230 let wait_for_idle = || async move {
1231 let handle = Handle::current();
1232 while handle.metrics().num_alive_tasks() != 1 {
1233 tokio::time::sleep(Duration::from_millis(10)).await;
1234 }
1235 };
1236 let wait_for_bytes_read_and_idle = |target_bytes: u64| {
1237 let bytes_read = &bytes_read;
1239 async move {
1240 let bytes_read_copy = bytes_read.clone();
1241 while bytes_read_copy.load(Ordering::Acquire) < target_bytes {
1242 tokio::time::sleep(Duration::from_millis(10)).await;
1243 }
1244 wait_for_idle().await;
1245 }
1246 };
1247
1248 let first_fut = file_scheduler.submit_single(0..5, 0);
1250 let second_fut = file_scheduler.submit_single(0..5, 0);
1252 let third_fut = file_scheduler.submit_single(0..3, 0);
1254 wait_for_bytes_read_and_idle(10).await;
1256
1257 assert_eq!(first_fut.await.unwrap().len(), 5);
1258 wait_for_bytes_read_and_idle(13).await;
1260
1261 let fourth_fut = file_scheduler.submit_single(0..5, 0);
1263 wait_for_bytes_read_and_idle(13).await;
1264
1265 assert_eq!(third_fut.await.unwrap().len(), 3);
1267 wait_for_bytes_read_and_idle(18).await;
1268
1269 assert_eq!(second_fut.await.unwrap().len(), 5);
1270 let fifth_fut = file_scheduler.submit_request(vec![0..3, 90000..90007], 0);
1277 wait_for_bytes_read_and_idle(21).await;
1278
1279 let fifth_bytes = tokio::time::timeout(Duration::from_secs(10), fifth_fut)
1281 .await
1282 .unwrap();
1283 assert_eq!(
1284 fifth_bytes.unwrap().iter().map(|b| b.len()).sum::<usize>(),
1285 10
1286 );
1287
1288 assert_eq!(fourth_fut.await.unwrap().len(), 5);
1290 wait_for_bytes_read_and_idle(28).await;
1291
1292 let config = SchedulerConfig {
1294 io_buffer_size_bytes: 10,
1295 };
1296
1297 let scan_scheduler = ScanScheduler::new(obj_store, config);
1298 let file_scheduler = scan_scheduler
1299 .open_file(&Path::parse("foo").unwrap(), &CachedFileSize::new(100000))
1300 .await
1301 .unwrap();
1302
1303 let first_fut = file_scheduler.submit_single(0..10, 0);
1304 let second_fut = file_scheduler.submit_single(0..10, 0);
1305
1306 std::thread::sleep(Duration::from_millis(100));
1307 assert_eq!(first_fut.await.unwrap().len(), 10);
1308 assert_eq!(second_fut.await.unwrap().len(), 10);
1309 }
1310
1311 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1312 async fn stress_backpressure() {
1313 let some_path = Path::parse("foo").unwrap();
1317 let obj_store = Arc::new(ObjectStore::memory());
1318 obj_store
1319 .put(&some_path, vec![0; 100000].as_slice())
1320 .await
1321 .unwrap();
1322
1323 let config = SchedulerConfig {
1325 io_buffer_size_bytes: 1,
1326 };
1327 let scan_scheduler = ScanScheduler::new(obj_store.clone(), config);
1328 let file_scheduler = scan_scheduler
1329 .open_file(&some_path, &CachedFileSize::unknown())
1330 .await
1331 .unwrap();
1332
1333 let mut futs = Vec::with_capacity(10000);
1334 for idx in 0..10000 {
1335 futs.push(file_scheduler.submit_single(idx..idx + 1, idx));
1336 }
1337
1338 for fut in futs {
1339 fut.await.unwrap();
1340 }
1341 }
1342}