1use bytes::Bytes;
5use futures::channel::oneshot;
6use futures::{FutureExt, TryFutureExt};
7use object_store::path::Path;
8use snafu::location;
9use std::collections::BinaryHeap;
10use std::fmt::Debug;
11use std::future::Future;
12use std::num::NonZero;
13use std::ops::Range;
14use std::sync::atomic::{AtomicU64, Ordering};
15use std::sync::{Arc, Mutex};
16use std::time::Instant;
17use tokio::sync::{Notify, Semaphore, SemaphorePermit};
18
19use lance_core::{Error, Result};
20
21use crate::object_store::ObjectStore;
22use crate::traits::Reader;
23use crate::utils::CachedFileSize;
24
25const BACKPRESSURE_MIN: u64 = 5;
27const BACKPRESSURE_DEBOUNCE: u64 = 60;
29
30static IOPS_COUNTER: AtomicU64 = AtomicU64::new(0);
32static BYTES_READ_COUNTER: AtomicU64 = AtomicU64::new(0);
34static DEFAULT_PROCESS_IOPS_LIMIT: i32 = 128;
47
48pub fn iops_counter() -> u64 {
49 IOPS_COUNTER.load(Ordering::Acquire)
50}
51
52pub fn bytes_read_counter() -> u64 {
53 BYTES_READ_COUNTER.load(Ordering::Acquire)
54}
55
56struct IopsQuota {
83 iops_avail: Option<Semaphore>,
85}
86
87struct IopsReservation<'a> {
92 value: Option<SemaphorePermit<'a>>,
93}
94
95impl IopsReservation<'_> {
96 fn forget(&mut self) {
98 if let Some(value) = self.value.take() {
99 value.forget();
100 }
101 }
102}
103
104impl IopsQuota {
105 fn new() -> Self {
110 let initial_capacity = std::env::var("LANCE_PROCESS_IO_THREADS_LIMIT")
111 .map(|s| {
112 s.parse::<i32>().unwrap_or_else(|_| {
113 log::warn!("Ignoring invalid LANCE_PROCESS_IO_THREADS_LIMIT: {}", s);
114 DEFAULT_PROCESS_IOPS_LIMIT
115 })
116 })
117 .unwrap_or(DEFAULT_PROCESS_IOPS_LIMIT);
118 let iops_avail = if initial_capacity <= 0 {
119 None
120 } else {
121 Some(Semaphore::new(initial_capacity as usize))
122 };
123 Self { iops_avail }
124 }
125
126 fn release(&self) {
128 if let Some(iops_avail) = self.iops_avail.as_ref() {
129 iops_avail.add_permits(1);
130 }
131 }
132
133 async fn acquire(&self) -> IopsReservation<'_> {
135 if let Some(iops_avail) = self.iops_avail.as_ref() {
136 IopsReservation {
137 value: Some(iops_avail.acquire().await.unwrap()),
138 }
139 } else {
140 IopsReservation { value: None }
141 }
142 }
143}
144
145static IOPS_QUOTA: std::sync::LazyLock<IopsQuota> = std::sync::LazyLock::new(IopsQuota::new);
146
147struct PrioritiesInFlight {
156 in_flight: Vec<u128>,
157}
158
159impl PrioritiesInFlight {
160 fn new(capacity: u32) -> Self {
161 Self {
162 in_flight: Vec::with_capacity(capacity as usize * 2),
163 }
164 }
165
166 fn min_in_flight(&self) -> u128 {
167 self.in_flight.first().copied().unwrap_or(u128::MAX)
168 }
169
170 fn push(&mut self, prio: u128) {
171 let pos = match self.in_flight.binary_search(&prio) {
172 Ok(pos) => pos,
173 Err(pos) => pos,
174 };
175 self.in_flight.insert(pos, prio);
176 }
177
178 fn remove(&mut self, prio: u128) {
179 if let Ok(pos) = self.in_flight.binary_search(&prio) {
180 self.in_flight.remove(pos);
181 } else {
182 unreachable!();
183 }
184 }
185}
186
187struct IoQueueState {
188 iops_avail: u32,
190 bytes_avail: i64,
194 pending_requests: BinaryHeap<IoTask>,
196 priorities_in_flight: PrioritiesInFlight,
198 done_scheduling: bool,
201 start: Instant,
203 last_warn: AtomicU64,
205}
206
207impl IoQueueState {
208 fn new(io_capacity: u32, io_buffer_size: u64) -> Self {
209 Self {
210 iops_avail: io_capacity,
211 bytes_avail: io_buffer_size as i64,
212 pending_requests: BinaryHeap::new(),
213 priorities_in_flight: PrioritiesInFlight::new(io_capacity),
214 done_scheduling: false,
215 start: Instant::now(),
216 last_warn: AtomicU64::from(0),
217 }
218 }
219
220 fn finished(&self) -> bool {
221 self.done_scheduling && self.pending_requests.is_empty()
222 }
223
224 fn warn_if_needed(&self) {
225 let seconds_elapsed = self.start.elapsed().as_secs();
226 let last_warn = self.last_warn.load(Ordering::Acquire);
227 let since_last_warn = seconds_elapsed - last_warn;
228 if (last_warn == 0
229 && seconds_elapsed > BACKPRESSURE_MIN
230 && seconds_elapsed < BACKPRESSURE_DEBOUNCE)
231 || since_last_warn > BACKPRESSURE_DEBOUNCE
232 {
233 tracing::event!(tracing::Level::DEBUG, "Backpressure throttle exceeded");
234 log::debug!("Backpressure throttle is full, I/O will pause until buffer is drained. Max I/O bandwidth will not be achieved because CPU is falling behind");
235 self.last_warn
236 .store(seconds_elapsed.max(1), Ordering::Release);
237 }
238 }
239
240 fn can_deliver(&self, task: &IoTask) -> bool {
241 if self.iops_avail == 0 {
242 false
243 } else if task.priority <= self.priorities_in_flight.min_in_flight() {
244 true
245 } else if task.num_bytes() as i64 > self.bytes_avail {
246 self.warn_if_needed();
247 false
248 } else {
249 true
250 }
251 }
252
253 fn next_task(&mut self) -> Option<IoTask> {
254 let task = self.pending_requests.peek()?;
255 if self.can_deliver(task) {
256 self.priorities_in_flight.push(task.priority);
257 self.iops_avail -= 1;
258 self.bytes_avail -= task.num_bytes() as i64;
259 if self.bytes_avail < 0 {
260 log::debug!(
262 "Backpressure throttle temporarily exceeded by {} bytes due to priority I/O",
263 -self.bytes_avail
264 );
265 }
266 Some(self.pending_requests.pop().unwrap())
267 } else {
268 None
269 }
270 }
271}
272
273struct IoQueue {
278 state: Mutex<IoQueueState>,
280 notify: Notify,
282}
283
284impl IoQueue {
285 fn new(io_capacity: u32, io_buffer_size: u64) -> Self {
286 Self {
287 state: Mutex::new(IoQueueState::new(io_capacity, io_buffer_size)),
288 notify: Notify::new(),
289 }
290 }
291
292 fn push(&self, task: IoTask) {
293 log::trace!(
294 "Inserting I/O request for {} bytes with priority ({},{}) into I/O queue",
295 task.num_bytes(),
296 task.priority >> 64,
297 task.priority & 0xFFFFFFFFFFFFFFFF
298 );
299 let mut state = self.state.lock().unwrap();
300 state.pending_requests.push(task);
301 drop(state);
302
303 self.notify.notify_one();
304 }
305
306 async fn pop(&self) -> Option<IoTask> {
307 loop {
308 {
309 let mut iop_res = IOPS_QUOTA.acquire().await;
314 let mut state = self.state.lock().unwrap();
316 if let Some(task) = state.next_task() {
317 iop_res.forget();
320 return Some(task);
321 }
322
323 if state.finished() {
324 return None;
325 }
326 }
327
328 self.notify.notified().await;
329 }
330 }
331
332 fn on_iop_complete(&self) {
333 let mut state = self.state.lock().unwrap();
334 state.iops_avail += 1;
335 drop(state);
336
337 self.notify.notify_one();
338 }
339
340 fn on_bytes_consumed(&self, bytes: u64, priority: u128, num_reqs: usize) {
341 let mut state = self.state.lock().unwrap();
342 state.bytes_avail += bytes as i64;
343 for _ in 0..num_reqs {
344 state.priorities_in_flight.remove(priority);
345 }
346 drop(state);
347
348 self.notify.notify_one();
349 }
350
351 fn close(&self) {
352 let mut state = self.state.lock().unwrap();
353 state.done_scheduling = true;
354 drop(state);
355
356 self.notify.notify_one();
357 }
358}
359
360struct MutableBatch<F: FnOnce(Response) + Send> {
365 when_done: Option<F>,
366 data_buffers: Vec<Bytes>,
367 num_bytes: u64,
368 priority: u128,
369 num_reqs: usize,
370 err: Option<Box<dyn std::error::Error + Send + Sync + 'static>>,
371}
372
373impl<F: FnOnce(Response) + Send> MutableBatch<F> {
374 fn new(when_done: F, num_data_buffers: u32, priority: u128, num_reqs: usize) -> Self {
375 Self {
376 when_done: Some(when_done),
377 data_buffers: vec![Bytes::default(); num_data_buffers as usize],
378 num_bytes: 0,
379 priority,
380 num_reqs,
381 err: None,
382 }
383 }
384}
385
386impl<F: FnOnce(Response) + Send> Drop for MutableBatch<F> {
391 fn drop(&mut self) {
392 let result = if self.err.is_some() {
394 Err(Error::Wrapped {
395 error: self.err.take().unwrap(),
396 location: location!(),
397 })
398 } else {
399 let mut data = Vec::new();
400 std::mem::swap(&mut data, &mut self.data_buffers);
401 Ok(data)
402 };
403 let response = Response {
406 data: result,
407 num_bytes: self.num_bytes,
408 priority: self.priority,
409 num_reqs: self.num_reqs,
410 };
411 (self.when_done.take().unwrap())(response);
412 }
413}
414
415struct DataChunk {
416 task_idx: usize,
417 num_bytes: u64,
418 data: Result<Bytes>,
419}
420
421trait DataSink: Send {
422 fn deliver_data(&mut self, data: DataChunk);
423}
424
425impl<F: FnOnce(Response) + Send> DataSink for MutableBatch<F> {
426 fn deliver_data(&mut self, data: DataChunk) {
428 self.num_bytes += data.num_bytes;
429 match data.data {
430 Ok(data_bytes) => {
431 self.data_buffers[data.task_idx] = data_bytes;
432 }
433 Err(err) => {
434 self.err.get_or_insert(Box::new(err));
436 }
437 }
438 }
439}
440
441struct IoTask {
442 reader: Arc<dyn Reader>,
443 to_read: Range<u64>,
444 when_done: Box<dyn FnOnce(Result<Bytes>) + Send>,
445 priority: u128,
446}
447
448impl Eq for IoTask {}
449
450impl PartialEq for IoTask {
451 fn eq(&self, other: &Self) -> bool {
452 self.priority == other.priority
453 }
454}
455
456impl PartialOrd for IoTask {
457 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
458 Some(self.cmp(other))
459 }
460}
461
462impl Ord for IoTask {
463 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
464 other.priority.cmp(&self.priority)
466 }
467}
468
469impl IoTask {
470 fn num_bytes(&self) -> u64 {
471 self.to_read.end - self.to_read.start
472 }
473
474 async fn run(self) {
475 let bytes = if self.to_read.start == self.to_read.end {
476 Ok(Bytes::new())
477 } else {
478 let bytes_fut = self
479 .reader
480 .get_range(self.to_read.start as usize..self.to_read.end as usize);
481 IOPS_COUNTER.fetch_add(1, Ordering::Release);
482 let num_bytes = self.num_bytes();
483 bytes_fut
484 .inspect(move |_| {
485 BYTES_READ_COUNTER.fetch_add(num_bytes, Ordering::Release);
486 })
487 .await
488 .map_err(Error::from)
489 };
490 IOPS_QUOTA.release();
491 (self.when_done)(bytes);
492 }
493}
494
495async fn run_io_loop(tasks: Arc<IoQueue>) {
498 loop {
501 let next_task = tasks.pop().await;
502 match next_task {
503 Some(task) => {
504 tokio::spawn(task.run());
505 }
506 None => {
507 return;
509 }
510 }
511 }
512}
513
514#[derive(Debug)]
515struct StatsCollector {
516 iops: AtomicU64,
517 requests: AtomicU64,
518 bytes_read: AtomicU64,
519}
520
521impl StatsCollector {
522 fn new() -> Self {
523 Self {
524 iops: AtomicU64::new(0),
525 requests: AtomicU64::new(0),
526 bytes_read: AtomicU64::new(0),
527 }
528 }
529
530 fn iops(&self) -> u64 {
531 self.iops.load(Ordering::Relaxed)
532 }
533
534 fn bytes_read(&self) -> u64 {
535 self.bytes_read.load(Ordering::Relaxed)
536 }
537
538 fn requests(&self) -> u64 {
539 self.requests.load(Ordering::Relaxed)
540 }
541
542 fn record_request(&self, request: &[Range<u64>]) {
543 self.requests.fetch_add(1, Ordering::Relaxed);
544 self.iops.fetch_add(request.len() as u64, Ordering::Relaxed);
545 self.bytes_read.fetch_add(
546 request.iter().map(|r| r.end - r.start).sum::<u64>(),
547 Ordering::Relaxed,
548 );
549 }
550}
551
552pub struct ScanStats {
553 pub iops: u64,
554 pub requests: u64,
555 pub bytes_read: u64,
556}
557
558impl ScanStats {
559 fn new(stats: &StatsCollector) -> Self {
560 Self {
561 iops: stats.iops(),
562 requests: stats.requests(),
563 bytes_read: stats.bytes_read(),
564 }
565 }
566}
567
568pub struct ScanScheduler {
573 object_store: Arc<ObjectStore>,
574 io_queue: Arc<IoQueue>,
575 stats: Arc<StatsCollector>,
576}
577
578impl Debug for ScanScheduler {
579 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
580 f.debug_struct("ScanScheduler")
581 .field("object_store", &self.object_store)
582 .finish()
583 }
584}
585
586struct Response {
587 data: Result<Vec<Bytes>>,
588 priority: u128,
589 num_reqs: usize,
590 num_bytes: u64,
591}
592
593#[derive(Debug, Clone, Copy)]
594pub struct SchedulerConfig {
595 pub io_buffer_size_bytes: u64,
599}
600
601impl SchedulerConfig {
602 pub fn default_for_testing() -> Self {
604 Self {
605 io_buffer_size_bytes: 256 * 1024 * 1024,
606 }
607 }
608
609 pub fn max_bandwidth(store: &ObjectStore) -> Self {
612 Self {
613 io_buffer_size_bytes: 32 * 1024 * 1024 * store.io_parallelism() as u64,
614 }
615 }
616}
617
618impl ScanScheduler {
619 pub fn new(object_store: Arc<ObjectStore>, config: SchedulerConfig) -> Arc<Self> {
626 let io_capacity = object_store.io_parallelism();
627 let io_queue = Arc::new(IoQueue::new(
628 io_capacity as u32,
629 config.io_buffer_size_bytes,
630 ));
631 let scheduler = Self {
632 object_store,
633 io_queue: io_queue.clone(),
634 stats: Arc::new(StatsCollector::new()),
635 };
636 tokio::task::spawn(async move { run_io_loop(io_queue).await });
637 Arc::new(scheduler)
638 }
639
640 pub async fn open_file_with_priority(
649 self: &Arc<Self>,
650 path: &Path,
651 base_priority: u64,
652 file_size_bytes: &CachedFileSize,
653 ) -> Result<FileScheduler> {
654 let file_size_bytes = if let Some(size) = file_size_bytes.get() {
655 u64::from(size)
656 } else {
657 let size = self.object_store.size(path).await?;
658 if let Some(size) = NonZero::new(size) {
659 file_size_bytes.set(size);
660 }
661 size
662 };
663 let reader = self
664 .object_store
665 .open_with_size(path, file_size_bytes as usize)
666 .await?;
667 let block_size = self.object_store.block_size() as u64;
668 let max_iop_size = self.object_store.max_iop_size();
669 Ok(FileScheduler {
670 reader: reader.into(),
671 block_size,
672 root: self.clone(),
673 base_priority,
674 max_iop_size,
675 })
676 }
677
678 pub async fn open_file(
682 self: &Arc<Self>,
683 path: &Path,
684 file_size_bytes: &CachedFileSize,
685 ) -> Result<FileScheduler> {
686 self.open_file_with_priority(path, 0, file_size_bytes).await
687 }
688
689 fn do_submit_request(
690 &self,
691 reader: Arc<dyn Reader>,
692 request: Vec<Range<u64>>,
693 tx: oneshot::Sender<Response>,
694 priority: u128,
695 ) {
696 let num_iops = request.len() as u32;
697
698 let when_all_io_done = move |bytes_and_permits| {
699 let _ = tx.send(bytes_and_permits);
701 };
702
703 let dest = Arc::new(Mutex::new(Box::new(MutableBatch::new(
704 when_all_io_done,
705 num_iops,
706 priority,
707 request.len(),
708 ))));
709
710 for (task_idx, iop) in request.into_iter().enumerate() {
711 let dest = dest.clone();
712 let io_queue = self.io_queue.clone();
713 let num_bytes = iop.end - iop.start;
714 let task = IoTask {
715 reader: reader.clone(),
716 to_read: iop,
717 priority,
718 when_done: Box::new(move |data| {
719 io_queue.on_iop_complete();
720 let mut dest = dest.lock().unwrap();
721 let chunk = DataChunk {
722 data,
723 task_idx,
724 num_bytes,
725 };
726 dest.deliver_data(chunk);
727 }),
728 };
729 self.io_queue.push(task);
730 }
731 }
732
733 fn submit_request(
734 &self,
735 reader: Arc<dyn Reader>,
736 request: Vec<Range<u64>>,
737 priority: u128,
738 ) -> impl Future<Output = Result<Vec<Bytes>>> + Send {
739 let (tx, rx) = oneshot::channel::<Response>();
740
741 self.do_submit_request(reader, request, tx, priority);
742
743 let io_queue = self.io_queue.clone();
744
745 rx.map(move |wrapped_rsp| {
746 let rsp = wrapped_rsp.unwrap();
749 io_queue.on_bytes_consumed(rsp.num_bytes, rsp.priority, rsp.num_reqs);
750 rsp.data
751 })
752 }
753
754 pub fn stats(&self) -> ScanStats {
755 ScanStats::new(self.stats.as_ref())
756 }
757}
758
759impl Drop for ScanScheduler {
760 fn drop(&mut self) {
761 self.io_queue.close();
762 }
763}
764
765#[derive(Clone, Debug)]
767pub struct FileScheduler {
768 reader: Arc<dyn Reader>,
769 root: Arc<ScanScheduler>,
770 block_size: u64,
771 base_priority: u64,
772 max_iop_size: u64,
773}
774
775fn is_close_together(range1: &Range<u64>, range2: &Range<u64>, block_size: u64) -> bool {
776 range2.start <= (range1.end + block_size)
778}
779
780fn is_overlapping(range1: &Range<u64>, range2: &Range<u64>) -> bool {
781 range1.start < range2.end && range2.start < range1.end
782}
783
784impl FileScheduler {
785 pub fn submit_request(
798 &self,
799 request: Vec<Range<u64>>,
800 priority: u64,
801 ) -> impl Future<Output = Result<Vec<Bytes>>> + Send {
802 let priority = ((self.base_priority as u128) << 64) + priority as u128;
804
805 let mut merged_requests = Vec::with_capacity(request.len());
806
807 if !request.is_empty() {
808 let mut curr_interval = request[0].clone();
809
810 for req in request.iter().skip(1) {
811 if is_close_together(&curr_interval, req, self.block_size) {
812 curr_interval.end = curr_interval.end.max(req.end);
813 } else {
814 merged_requests.push(curr_interval);
815 curr_interval = req.clone();
816 }
817 }
818
819 merged_requests.push(curr_interval);
820 }
821
822 let mut updated_requests = Vec::with_capacity(merged_requests.len());
823 for req in merged_requests {
824 if req.is_empty() {
825 updated_requests.push(req);
826 } else {
827 let num_requests = (req.end - req.start).div_ceil(self.max_iop_size);
828 let bytes_per_request = (req.end - req.start) / num_requests;
829 for i in 0..num_requests {
830 let start = req.start + i * bytes_per_request;
831 let end = if i == num_requests - 1 {
832 req.end
834 } else {
835 start + bytes_per_request
836 };
837 updated_requests.push(start..end);
838 }
839 }
840 }
841
842 self.root.stats.record_request(&updated_requests);
843
844 let bytes_vec_fut =
845 self.root
846 .submit_request(self.reader.clone(), updated_requests.clone(), priority);
847
848 let mut updated_index = 0;
849 let mut final_bytes = Vec::with_capacity(request.len());
850
851 async move {
852 let bytes_vec = bytes_vec_fut.await?;
853
854 let mut orig_index = 0;
855 while (updated_index < updated_requests.len()) && (orig_index < request.len()) {
856 let updated_range = &updated_requests[updated_index];
857 let orig_range = &request[orig_index];
858 let byte_offset = updated_range.start as usize;
859
860 if is_overlapping(updated_range, orig_range) {
861 let start = orig_range.start as usize - byte_offset;
863 if orig_range.end <= updated_range.end {
864 let end = orig_range.end as usize - byte_offset;
867 final_bytes.push(bytes_vec[updated_index].slice(start..end));
868 } else {
869 let orig_size = orig_range.end - orig_range.start;
872 let mut merged_bytes = Vec::with_capacity(orig_size as usize);
873 merged_bytes.extend_from_slice(&bytes_vec[updated_index].slice(start..));
874 let mut copy_offset = merged_bytes.len() as u64;
875 while copy_offset < orig_size {
876 updated_index += 1;
877 let next_range = &updated_requests[updated_index];
878 let bytes_to_take =
879 (orig_size - copy_offset).min(next_range.end - next_range.start);
880 merged_bytes.extend_from_slice(
881 &bytes_vec[updated_index].slice(0..bytes_to_take as usize),
882 );
883 copy_offset += bytes_to_take;
884 }
885 final_bytes.push(Bytes::from(merged_bytes));
886 }
887 orig_index += 1;
888 } else {
889 updated_index += 1;
890 }
891 }
892
893 Ok(final_bytes)
894 }
895 }
896
897 pub fn with_priority(&self, priority: u64) -> Self {
898 Self {
899 reader: self.reader.clone(),
900 root: self.root.clone(),
901 block_size: self.block_size,
902 max_iop_size: self.max_iop_size,
903 base_priority: priority,
904 }
905 }
906
907 pub fn submit_single(
914 &self,
915 range: Range<u64>,
916 priority: u64,
917 ) -> impl Future<Output = Result<Bytes>> + Send {
918 self.submit_request(vec![range], priority)
919 .map_ok(|vec_bytes| vec_bytes.into_iter().next().unwrap())
920 }
921
922 pub fn reader(&self) -> &Arc<dyn Reader> {
928 &self.reader
929 }
930}
931
932#[cfg(test)]
933mod tests {
934 use std::{collections::VecDeque, time::Duration};
935
936 use futures::poll;
937 use rand::RngCore;
938 use tempfile::tempdir;
939
940 use object_store::{memory::InMemory, GetRange, ObjectStore as OSObjectStore};
941 use tokio::{runtime::Handle, time::timeout};
942 use url::Url;
943
944 use crate::{
945 object_store::{DEFAULT_DOWNLOAD_RETRY_COUNT, DEFAULT_MAX_IOP_SIZE},
946 testing::MockObjectStore,
947 };
948
949 use super::*;
950
951 #[tokio::test]
952 async fn test_full_seq_read() {
953 let tmpdir = tempdir().unwrap();
954 let tmp_path = tmpdir.path().to_str().unwrap();
955 let tmp_path = Path::parse(tmp_path).unwrap();
956 let tmp_file = tmp_path.child("foo.file");
957
958 let obj_store = Arc::new(ObjectStore::local());
959
960 const DATA_SIZE: u64 = 1024 * 1024;
962 let mut some_data = vec![0; DATA_SIZE as usize];
963 rand::rng().fill_bytes(&mut some_data);
964 obj_store.put(&tmp_file, &some_data).await.unwrap();
965
966 let config = SchedulerConfig::default_for_testing();
967
968 let scheduler = ScanScheduler::new(obj_store, config);
969
970 let file_scheduler = scheduler
971 .open_file(&tmp_file, &CachedFileSize::unknown())
972 .await
973 .unwrap();
974
975 const READ_SIZE: u64 = 4 * 1024;
977 let mut reqs = VecDeque::new();
978 let mut offset = 0;
979 while offset < DATA_SIZE {
980 reqs.push_back(
981 #[allow(clippy::single_range_in_vec_init)]
982 file_scheduler
983 .submit_request(vec![offset..offset + READ_SIZE], 0)
984 .await
985 .unwrap(),
986 );
987 offset += READ_SIZE;
988 }
989
990 offset = 0;
991 while offset < DATA_SIZE {
993 let data = reqs.pop_front().unwrap();
994 let actual = &data[0];
995 let expected = &some_data[offset as usize..(offset + READ_SIZE) as usize];
996 assert_eq!(expected, actual);
997 offset += READ_SIZE;
998 }
999 }
1000
1001 #[tokio::test]
1002 async fn test_split_coalesce() {
1003 let tmpdir = tempdir().unwrap();
1004 let tmp_path = tmpdir.path().to_str().unwrap();
1005 let tmp_path = Path::parse(tmp_path).unwrap();
1006 let tmp_file = tmp_path.child("foo.file");
1007
1008 let obj_store = Arc::new(ObjectStore::local());
1009
1010 const DATA_SIZE: u64 = 75 * 1024 * 1024;
1012 let mut some_data = vec![0; DATA_SIZE as usize];
1013 rand::rng().fill_bytes(&mut some_data);
1014 obj_store.put(&tmp_file, &some_data).await.unwrap();
1015
1016 let config = SchedulerConfig::default_for_testing();
1017
1018 let scheduler = ScanScheduler::new(obj_store, config);
1019
1020 let file_scheduler = scheduler
1021 .open_file(&tmp_file, &CachedFileSize::unknown())
1022 .await
1023 .unwrap();
1024
1025 let req =
1028 file_scheduler.submit_request(vec![50_000..51_000, 52_000..53_000, 54_000..55_000], 0);
1029
1030 let bytes = req.await.unwrap();
1031
1032 assert_eq!(bytes[0], &some_data[50_000..51_000]);
1033 assert_eq!(bytes[1], &some_data[52_000..53_000]);
1034 assert_eq!(bytes[2], &some_data[54_000..55_000]);
1035
1036 assert_eq!(1, scheduler.stats().iops);
1037
1038 let req = file_scheduler.submit_request(vec![0..DATA_SIZE], 0);
1040 let bytes = req.await.unwrap();
1041 assert!(bytes[0] == some_data, "data is not the same");
1042
1043 assert_eq!(6, scheduler.stats().iops);
1044
1045 let chunk_size = *DEFAULT_MAX_IOP_SIZE;
1049 let req = file_scheduler.submit_request(
1050 vec![
1051 10..chunk_size,
1052 chunk_size + 10..(chunk_size * 2) - 20,
1053 chunk_size * 2..(chunk_size * 2) + 10,
1054 ],
1055 0,
1056 );
1057
1058 let bytes = req.await.unwrap();
1059 let chunk_size = chunk_size as usize;
1060 assert!(
1061 bytes[0] == some_data[10..chunk_size],
1062 "data is not the same"
1063 );
1064 assert!(
1065 bytes[1] == some_data[chunk_size + 10..(chunk_size * 2) - 20],
1066 "data is not the same"
1067 );
1068 assert!(
1069 bytes[2] == some_data[chunk_size * 2..(chunk_size * 2) + 10],
1070 "data is not the same"
1071 );
1072 assert_eq!(8, scheduler.stats().iops);
1073
1074 let reads = (0..44)
1075 .map(|i| (i * 1_000_000..(i + 1) * 1_000_000))
1076 .collect::<Vec<_>>();
1077 let req = file_scheduler.submit_request(reads, 0);
1078 let bytes = req.await.unwrap();
1079 for (i, bytes) in bytes.iter().enumerate() {
1080 assert!(
1081 bytes == &some_data[i * 1_000_000..(i + 1) * 1_000_000],
1082 "data is not the same"
1083 );
1084 }
1085 assert_eq!(11, scheduler.stats().iops);
1086 }
1087
1088 #[tokio::test]
1089 async fn test_priority() {
1090 let some_path = Path::parse("foo").unwrap();
1091 let base_store = Arc::new(InMemory::new());
1092 base_store
1093 .put(&some_path, vec![0; 1000].into())
1094 .await
1095 .unwrap();
1096
1097 let semaphore = Arc::new(tokio::sync::Semaphore::new(0));
1098 let mut obj_store = MockObjectStore::default();
1099 let semaphore_copy = semaphore.clone();
1100 obj_store
1101 .expect_get_opts()
1102 .returning(move |location, options| {
1103 let semaphore = semaphore.clone();
1104 let base_store = base_store.clone();
1105 let location = location.clone();
1106 async move {
1107 semaphore.acquire().await.unwrap().forget();
1108 base_store.get_opts(&location, options).await
1109 }
1110 .boxed()
1111 });
1112 let obj_store = Arc::new(ObjectStore::new(
1113 Arc::new(obj_store),
1114 Url::parse("mem://").unwrap(),
1115 Some(500),
1116 None,
1117 false,
1118 false,
1119 1,
1120 DEFAULT_DOWNLOAD_RETRY_COUNT,
1121 ));
1122
1123 let config = SchedulerConfig {
1124 io_buffer_size_bytes: 1024 * 1024,
1125 };
1126
1127 let scan_scheduler = ScanScheduler::new(obj_store, config);
1128
1129 let file_scheduler = scan_scheduler
1130 .open_file(&Path::parse("foo").unwrap(), &CachedFileSize::new(1000))
1131 .await
1132 .unwrap();
1133
1134 let first_fut = timeout(
1138 Duration::from_secs(10),
1139 file_scheduler.submit_single(0..10, 0),
1140 )
1141 .boxed();
1142
1143 let mut second_fut = timeout(
1145 Duration::from_secs(10),
1146 file_scheduler.submit_single(0..20, 100),
1147 )
1148 .boxed();
1149
1150 let mut third_fut = timeout(
1153 Duration::from_secs(10),
1154 file_scheduler.submit_single(0..30, 0),
1155 )
1156 .boxed();
1157
1158 semaphore_copy.add_permits(1);
1160 assert!(first_fut.await.unwrap().unwrap().len() == 10);
1161 assert!(poll!(&mut second_fut).is_pending());
1163 assert!(poll!(&mut third_fut).is_pending());
1164
1165 semaphore_copy.add_permits(1);
1167 assert!(third_fut.await.unwrap().unwrap().len() == 30);
1168 assert!(poll!(&mut second_fut).is_pending());
1169
1170 semaphore_copy.add_permits(1);
1172 assert!(second_fut.await.unwrap().unwrap().len() == 20);
1173 }
1174
1175 #[tokio::test(flavor = "multi_thread")]
1176 async fn test_backpressure() {
1177 let some_path = Path::parse("foo").unwrap();
1178 let base_store = Arc::new(InMemory::new());
1179 base_store
1180 .put(&some_path, vec![0; 100000].into())
1181 .await
1182 .unwrap();
1183
1184 let bytes_read = Arc::new(AtomicU64::from(0));
1185 let mut obj_store = MockObjectStore::default();
1186 let bytes_read_copy = bytes_read.clone();
1187 obj_store
1189 .expect_get_opts()
1190 .returning(move |location, options| {
1191 let range = options.range.as_ref().unwrap();
1192 let num_bytes = match range {
1193 GetRange::Bounded(bounded) => bounded.end - bounded.start,
1194 _ => panic!(),
1195 };
1196 bytes_read_copy.fetch_add(num_bytes, Ordering::Release);
1197 let location = location.clone();
1198 let base_store = base_store.clone();
1199 async move { base_store.get_opts(&location, options).await }.boxed()
1200 });
1201 let obj_store = Arc::new(ObjectStore::new(
1202 Arc::new(obj_store),
1203 Url::parse("mem://").unwrap(),
1204 Some(500),
1205 None,
1206 false,
1207 false,
1208 1,
1209 DEFAULT_DOWNLOAD_RETRY_COUNT,
1210 ));
1211
1212 let config = SchedulerConfig {
1213 io_buffer_size_bytes: 10,
1214 };
1215
1216 let scan_scheduler = ScanScheduler::new(obj_store.clone(), config);
1217
1218 let file_scheduler = scan_scheduler
1219 .open_file(&Path::parse("foo").unwrap(), &CachedFileSize::new(100000))
1220 .await
1221 .unwrap();
1222
1223 let wait_for_idle = || async move {
1224 let handle = Handle::current();
1225 while handle.metrics().num_alive_tasks() != 1 {
1226 tokio::time::sleep(Duration::from_millis(10)).await;
1227 }
1228 };
1229 let wait_for_bytes_read_and_idle = |target_bytes: u64| {
1230 let bytes_read = &bytes_read;
1232 async move {
1233 let bytes_read_copy = bytes_read.clone();
1234 while bytes_read_copy.load(Ordering::Acquire) < target_bytes {
1235 tokio::time::sleep(Duration::from_millis(10)).await;
1236 }
1237 wait_for_idle().await;
1238 }
1239 };
1240
1241 let first_fut = file_scheduler.submit_single(0..5, 0);
1243 let second_fut = file_scheduler.submit_single(0..5, 0);
1245 let third_fut = file_scheduler.submit_single(0..3, 0);
1247 wait_for_bytes_read_and_idle(10).await;
1249
1250 assert_eq!(first_fut.await.unwrap().len(), 5);
1251 wait_for_bytes_read_and_idle(13).await;
1253
1254 let fourth_fut = file_scheduler.submit_single(0..5, 0);
1256 wait_for_bytes_read_and_idle(13).await;
1257
1258 assert_eq!(third_fut.await.unwrap().len(), 3);
1260 wait_for_bytes_read_and_idle(18).await;
1261
1262 assert_eq!(second_fut.await.unwrap().len(), 5);
1263 let fifth_fut = file_scheduler.submit_request(vec![0..3, 90000..90007], 0);
1270 wait_for_bytes_read_and_idle(21).await;
1271
1272 let fifth_bytes = tokio::time::timeout(Duration::from_secs(10), fifth_fut)
1274 .await
1275 .unwrap();
1276 assert_eq!(
1277 fifth_bytes.unwrap().iter().map(|b| b.len()).sum::<usize>(),
1278 10
1279 );
1280
1281 assert_eq!(fourth_fut.await.unwrap().len(), 5);
1283 wait_for_bytes_read_and_idle(28).await;
1284
1285 let config = SchedulerConfig {
1287 io_buffer_size_bytes: 10,
1288 };
1289
1290 let scan_scheduler = ScanScheduler::new(obj_store, config);
1291 let file_scheduler = scan_scheduler
1292 .open_file(&Path::parse("foo").unwrap(), &CachedFileSize::new(100000))
1293 .await
1294 .unwrap();
1295
1296 let first_fut = file_scheduler.submit_single(0..10, 0);
1297 let second_fut = file_scheduler.submit_single(0..10, 0);
1298
1299 std::thread::sleep(Duration::from_millis(100));
1300 assert_eq!(first_fut.await.unwrap().len(), 10);
1301 assert_eq!(second_fut.await.unwrap().len(), 10);
1302 }
1303
1304 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1305 async fn stress_backpressure() {
1306 let some_path = Path::parse("foo").unwrap();
1310 let obj_store = Arc::new(ObjectStore::memory());
1311 obj_store
1312 .put(&some_path, vec![0; 100000].as_slice())
1313 .await
1314 .unwrap();
1315
1316 let config = SchedulerConfig {
1318 io_buffer_size_bytes: 1,
1319 };
1320 let scan_scheduler = ScanScheduler::new(obj_store.clone(), config);
1321 let file_scheduler = scan_scheduler
1322 .open_file(&some_path, &CachedFileSize::unknown())
1323 .await
1324 .unwrap();
1325
1326 let mut futs = Vec::with_capacity(10000);
1327 for idx in 0..10000 {
1328 futs.push(file_scheduler.submit_single(idx..idx + 1, idx));
1329 }
1330
1331 for fut in futs {
1332 fut.await.unwrap();
1333 }
1334 }
1335}