1use bytes::Bytes;
5use futures::channel::oneshot;
6use futures::{FutureExt, TryFutureExt};
7use object_store::path::Path;
8use snafu::location;
9use std::collections::BinaryHeap;
10use std::fmt::Debug;
11use std::future::Future;
12use std::num::NonZero;
13use std::ops::Range;
14use std::sync::atomic::{AtomicU64, Ordering};
15use std::sync::{Arc, Mutex};
16use std::time::Instant;
17use tokio::sync::{Notify, Semaphore, SemaphorePermit};
18
19use lance_core::{Error, Result};
20
21use crate::object_store::ObjectStore;
22use crate::traits::Reader;
23use crate::utils::CachedFileSize;
24
25const BACKPRESSURE_MIN: u64 = 5;
27const BACKPRESSURE_DEBOUNCE: u64 = 60;
29
30static IOPS_COUNTER: AtomicU64 = AtomicU64::new(0);
32static BYTES_READ_COUNTER: AtomicU64 = AtomicU64::new(0);
34
35pub fn iops_counter() -> u64 {
36 IOPS_COUNTER.load(Ordering::Acquire)
37}
38
39pub fn bytes_read_counter() -> u64 {
40 BYTES_READ_COUNTER.load(Ordering::Acquire)
41}
42
43struct IopsQuota {
70 iops_avail: Option<Semaphore>,
72}
73
74struct IopsReservation<'a> {
79 value: Option<SemaphorePermit<'a>>,
80}
81
82impl IopsReservation<'_> {
83 fn forget(&mut self) {
85 if let Some(value) = self.value.take() {
86 value.forget();
87 }
88 }
89}
90
91impl IopsQuota {
92 fn new() -> Self {
97 let initial_capacity = std::env::var("LANCE_PROCESS_IO_THREADS_LIMIT")
98 .map(|s| {
99 let limit = s
100 .parse::<i32>()
101 .expect("LANCE_PROCESS_IO_THREADS_LIMIT must be a positive integer");
102 if limit <= 0 {
103 panic!("LANCE_PROCESS_IO_THREADS_LIMIT must be a positive integer. To disable the limit, unset the environment variable");
104 }
105 limit
106 })
107 .unwrap_or(-1);
109 let iops_avail = if initial_capacity < 0 {
110 None
111 } else {
112 Some(Semaphore::new(initial_capacity as usize))
113 };
114 Self { iops_avail }
115 }
116
117 fn release(&self) {
119 if let Some(iops_avail) = self.iops_avail.as_ref() {
120 iops_avail.add_permits(1);
121 }
122 }
123
124 async fn acquire(&self) -> IopsReservation {
126 if let Some(iops_avail) = self.iops_avail.as_ref() {
127 IopsReservation {
128 value: Some(iops_avail.acquire().await.unwrap()),
129 }
130 } else {
131 IopsReservation { value: None }
132 }
133 }
134}
135
136lazy_static::lazy_static! {
137 static ref IOPS_QUOTA: IopsQuota = IopsQuota::new();
138}
139
140struct PrioritiesInFlight {
149 in_flight: Vec<u128>,
150}
151
152impl PrioritiesInFlight {
153 fn new(capacity: u32) -> Self {
154 Self {
155 in_flight: Vec::with_capacity(capacity as usize * 2),
156 }
157 }
158
159 fn min_in_flight(&self) -> u128 {
160 self.in_flight.first().copied().unwrap_or(u128::MAX)
161 }
162
163 fn push(&mut self, prio: u128) {
164 let pos = match self.in_flight.binary_search(&prio) {
165 Ok(pos) => pos,
166 Err(pos) => pos,
167 };
168 self.in_flight.insert(pos, prio);
169 }
170
171 fn remove(&mut self, prio: u128) {
172 if let Ok(pos) = self.in_flight.binary_search(&prio) {
173 self.in_flight.remove(pos);
174 } else {
175 unreachable!();
176 }
177 }
178}
179
180struct IoQueueState {
181 iops_avail: u32,
183 bytes_avail: i64,
187 pending_requests: BinaryHeap<IoTask>,
189 priorities_in_flight: PrioritiesInFlight,
191 done_scheduling: bool,
194 start: Instant,
196 last_warn: AtomicU64,
198}
199
200impl IoQueueState {
201 fn new(io_capacity: u32, io_buffer_size: u64) -> Self {
202 Self {
203 iops_avail: io_capacity,
204 bytes_avail: io_buffer_size as i64,
205 pending_requests: BinaryHeap::new(),
206 priorities_in_flight: PrioritiesInFlight::new(io_capacity),
207 done_scheduling: false,
208 start: Instant::now(),
209 last_warn: AtomicU64::from(0),
210 }
211 }
212
213 fn finished(&self) -> bool {
214 self.done_scheduling && self.pending_requests.is_empty()
215 }
216
217 fn warn_if_needed(&self) {
218 let seconds_elapsed = self.start.elapsed().as_secs();
219 let last_warn = self.last_warn.load(Ordering::Acquire);
220 let since_last_warn = seconds_elapsed - last_warn;
221 if (last_warn == 0
222 && seconds_elapsed > BACKPRESSURE_MIN
223 && seconds_elapsed < BACKPRESSURE_DEBOUNCE)
224 || since_last_warn > BACKPRESSURE_DEBOUNCE
225 {
226 tracing::event!(tracing::Level::DEBUG, "Backpressure throttle exceeded");
227 log::debug!("Backpressure throttle is full, I/O will pause until buffer is drained. Max I/O bandwidth will not be achieved because CPU is falling behind");
228 self.last_warn
229 .store(seconds_elapsed.max(1), Ordering::Release);
230 }
231 }
232
233 fn can_deliver(&self, task: &IoTask) -> bool {
234 if self.iops_avail == 0 {
235 false
236 } else if task.priority <= self.priorities_in_flight.min_in_flight() {
237 true
238 } else if task.num_bytes() as i64 > self.bytes_avail {
239 self.warn_if_needed();
240 false
241 } else {
242 true
243 }
244 }
245
246 fn next_task(&mut self) -> Option<IoTask> {
247 let task = self.pending_requests.peek()?;
248 if self.can_deliver(task) {
249 self.priorities_in_flight.push(task.priority);
250 self.iops_avail -= 1;
251 self.bytes_avail -= task.num_bytes() as i64;
252 if self.bytes_avail < 0 {
253 log::debug!(
255 "Backpressure throttle temporarily exceeded by {} bytes due to priority I/O",
256 -self.bytes_avail
257 );
258 }
259 Some(self.pending_requests.pop().unwrap())
260 } else {
261 None
262 }
263 }
264}
265
266struct IoQueue {
271 state: Mutex<IoQueueState>,
273 notify: Notify,
275}
276
277impl IoQueue {
278 fn new(io_capacity: u32, io_buffer_size: u64) -> Self {
279 Self {
280 state: Mutex::new(IoQueueState::new(io_capacity, io_buffer_size)),
281 notify: Notify::new(),
282 }
283 }
284
285 fn push(&self, task: IoTask) {
286 log::trace!(
287 "Inserting I/O request for {} bytes with priority ({},{}) into I/O queue",
288 task.num_bytes(),
289 task.priority >> 64,
290 task.priority & 0xFFFFFFFFFFFFFFFF
291 );
292 let mut state = self.state.lock().unwrap();
293 state.pending_requests.push(task);
294 drop(state);
295
296 self.notify.notify_one();
297 }
298
299 async fn pop(&self) -> Option<IoTask> {
300 loop {
301 {
302 let mut iop_res = IOPS_QUOTA.acquire().await;
307 let mut state = self.state.lock().unwrap();
309 if let Some(task) = state.next_task() {
310 iop_res.forget();
313 return Some(task);
314 }
315
316 if state.finished() {
317 return None;
318 }
319 }
320
321 self.notify.notified().await;
322 }
323 }
324
325 fn on_iop_complete(&self) {
326 let mut state = self.state.lock().unwrap();
327 state.iops_avail += 1;
328 drop(state);
329
330 self.notify.notify_one();
331 }
332
333 fn on_bytes_consumed(&self, bytes: u64, priority: u128, num_reqs: usize) {
334 let mut state = self.state.lock().unwrap();
335 state.bytes_avail += bytes as i64;
336 for _ in 0..num_reqs {
337 state.priorities_in_flight.remove(priority);
338 }
339 drop(state);
340
341 self.notify.notify_one();
342 }
343
344 fn close(&self) {
345 let mut state = self.state.lock().unwrap();
346 state.done_scheduling = true;
347 drop(state);
348
349 self.notify.notify_one();
350 }
351}
352
353struct MutableBatch<F: FnOnce(Response) + Send> {
358 when_done: Option<F>,
359 data_buffers: Vec<Bytes>,
360 num_bytes: u64,
361 priority: u128,
362 num_reqs: usize,
363 err: Option<Box<dyn std::error::Error + Send + Sync + 'static>>,
364}
365
366impl<F: FnOnce(Response) + Send> MutableBatch<F> {
367 fn new(when_done: F, num_data_buffers: u32, priority: u128, num_reqs: usize) -> Self {
368 Self {
369 when_done: Some(when_done),
370 data_buffers: vec![Bytes::default(); num_data_buffers as usize],
371 num_bytes: 0,
372 priority,
373 num_reqs,
374 err: None,
375 }
376 }
377}
378
379impl<F: FnOnce(Response) + Send> Drop for MutableBatch<F> {
384 fn drop(&mut self) {
385 let result = if self.err.is_some() {
387 Err(Error::Wrapped {
388 error: self.err.take().unwrap(),
389 location: location!(),
390 })
391 } else {
392 let mut data = Vec::new();
393 std::mem::swap(&mut data, &mut self.data_buffers);
394 Ok(data)
395 };
396 let response = Response {
399 data: result,
400 num_bytes: self.num_bytes,
401 priority: self.priority,
402 num_reqs: self.num_reqs,
403 };
404 (self.when_done.take().unwrap())(response);
405 }
406}
407
408struct DataChunk {
409 task_idx: usize,
410 num_bytes: u64,
411 data: Result<Bytes>,
412}
413
414trait DataSink: Send {
415 fn deliver_data(&mut self, data: DataChunk);
416}
417
418impl<F: FnOnce(Response) + Send> DataSink for MutableBatch<F> {
419 fn deliver_data(&mut self, data: DataChunk) {
421 self.num_bytes += data.num_bytes;
422 match data.data {
423 Ok(data_bytes) => {
424 self.data_buffers[data.task_idx] = data_bytes;
425 }
426 Err(err) => {
427 self.err.get_or_insert(Box::new(err));
429 }
430 }
431 }
432}
433
434struct IoTask {
435 reader: Arc<dyn Reader>,
436 to_read: Range<u64>,
437 when_done: Box<dyn FnOnce(Result<Bytes>) + Send>,
438 priority: u128,
439}
440
441impl Eq for IoTask {}
442
443impl PartialEq for IoTask {
444 fn eq(&self, other: &Self) -> bool {
445 self.priority == other.priority
446 }
447}
448
449impl PartialOrd for IoTask {
450 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
451 Some(self.cmp(other))
452 }
453}
454
455impl Ord for IoTask {
456 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
457 other.priority.cmp(&self.priority)
459 }
460}
461
462impl IoTask {
463 fn num_bytes(&self) -> u64 {
464 self.to_read.end - self.to_read.start
465 }
466
467 async fn run(self) {
468 let bytes = if self.to_read.start == self.to_read.end {
469 Ok(Bytes::new())
470 } else {
471 let bytes_fut = self
472 .reader
473 .get_range(self.to_read.start as usize..self.to_read.end as usize);
474 IOPS_COUNTER.fetch_add(1, Ordering::Release);
475 BYTES_READ_COUNTER.fetch_add(self.num_bytes(), Ordering::Release);
476 bytes_fut.await.map_err(Error::from)
477 };
478 IOPS_QUOTA.release();
479 (self.when_done)(bytes);
480 }
481}
482
483async fn run_io_loop(tasks: Arc<IoQueue>) {
486 loop {
489 let next_task = tasks.pop().await;
490 match next_task {
491 Some(task) => {
492 tokio::spawn(task.run());
493 }
494 None => {
495 return;
497 }
498 }
499 }
500}
501
502#[derive(Debug)]
503struct StatsCollector {
504 iops: AtomicU64,
505 requests: AtomicU64,
506 bytes_read: AtomicU64,
507}
508
509impl StatsCollector {
510 fn new() -> Self {
511 Self {
512 iops: AtomicU64::new(0),
513 requests: AtomicU64::new(0),
514 bytes_read: AtomicU64::new(0),
515 }
516 }
517
518 fn iops(&self) -> u64 {
519 self.iops.load(Ordering::Relaxed)
520 }
521
522 fn bytes_read(&self) -> u64 {
523 self.bytes_read.load(Ordering::Relaxed)
524 }
525
526 fn requests(&self) -> u64 {
527 self.requests.load(Ordering::Relaxed)
528 }
529
530 fn record_request(&self, request: &[Range<u64>]) {
531 self.requests.fetch_add(1, Ordering::Relaxed);
532 self.iops.fetch_add(request.len() as u64, Ordering::Relaxed);
533 self.bytes_read.fetch_add(
534 request.iter().map(|r| r.end - r.start).sum::<u64>(),
535 Ordering::Relaxed,
536 );
537 }
538}
539
540pub struct ScanStats {
541 pub iops: u64,
542 pub requests: u64,
543 pub bytes_read: u64,
544}
545
546impl ScanStats {
547 fn new(stats: &StatsCollector) -> Self {
548 Self {
549 iops: stats.iops(),
550 requests: stats.requests(),
551 bytes_read: stats.bytes_read(),
552 }
553 }
554}
555
556pub struct ScanScheduler {
561 object_store: Arc<ObjectStore>,
562 io_queue: Arc<IoQueue>,
563 stats: Arc<StatsCollector>,
564}
565
566impl Debug for ScanScheduler {
567 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
568 f.debug_struct("ScanScheduler")
569 .field("object_store", &self.object_store)
570 .finish()
571 }
572}
573
574struct Response {
575 data: Result<Vec<Bytes>>,
576 priority: u128,
577 num_reqs: usize,
578 num_bytes: u64,
579}
580
581#[derive(Debug, Clone, Copy)]
582pub struct SchedulerConfig {
583 pub io_buffer_size_bytes: u64,
587}
588
589impl SchedulerConfig {
590 pub fn default_for_testing() -> Self {
592 Self {
593 io_buffer_size_bytes: 256 * 1024 * 1024,
594 }
595 }
596
597 pub fn max_bandwidth(store: &ObjectStore) -> Self {
600 Self {
601 io_buffer_size_bytes: 32 * 1024 * 1024 * store.io_parallelism() as u64,
602 }
603 }
604}
605
606impl ScanScheduler {
607 pub fn new(object_store: Arc<ObjectStore>, config: SchedulerConfig) -> Arc<Self> {
614 let io_capacity = object_store.io_parallelism();
615 let io_queue = Arc::new(IoQueue::new(
616 io_capacity as u32,
617 config.io_buffer_size_bytes,
618 ));
619 let scheduler = Self {
620 object_store,
621 io_queue: io_queue.clone(),
622 stats: Arc::new(StatsCollector::new()),
623 };
624 tokio::task::spawn(async move { run_io_loop(io_queue).await });
625 Arc::new(scheduler)
626 }
627
628 pub async fn open_file_with_priority(
637 self: &Arc<Self>,
638 path: &Path,
639 base_priority: u64,
640 file_size_bytes: &CachedFileSize,
641 ) -> Result<FileScheduler> {
642 let file_size_bytes = if let Some(size) = file_size_bytes.get() {
643 u64::from(size) as usize
644 } else {
645 let size = self.object_store.size(path).await?;
646 if let Some(size) = NonZero::new(size as u64) {
647 file_size_bytes.set(size);
648 }
649 size
650 };
651 let reader = self
652 .object_store
653 .open_with_size(path, file_size_bytes)
654 .await?;
655 let block_size = self.object_store.block_size() as u64;
656 let max_iop_size = self.object_store.max_iop_size();
657 Ok(FileScheduler {
658 reader: reader.into(),
659 block_size,
660 root: self.clone(),
661 base_priority,
662 max_iop_size,
663 })
664 }
665
666 pub async fn open_file(
670 self: &Arc<Self>,
671 path: &Path,
672 file_size_bytes: &CachedFileSize,
673 ) -> Result<FileScheduler> {
674 self.open_file_with_priority(path, 0, file_size_bytes).await
675 }
676
677 fn do_submit_request(
678 &self,
679 reader: Arc<dyn Reader>,
680 request: Vec<Range<u64>>,
681 tx: oneshot::Sender<Response>,
682 priority: u128,
683 ) {
684 let num_iops = request.len() as u32;
685
686 let when_all_io_done = move |bytes_and_permits| {
687 let _ = tx.send(bytes_and_permits);
689 };
690
691 let dest = Arc::new(Mutex::new(Box::new(MutableBatch::new(
692 when_all_io_done,
693 num_iops,
694 priority,
695 request.len(),
696 ))));
697
698 for (task_idx, iop) in request.into_iter().enumerate() {
699 let dest = dest.clone();
700 let io_queue = self.io_queue.clone();
701 let num_bytes = iop.end - iop.start;
702 let task = IoTask {
703 reader: reader.clone(),
704 to_read: iop,
705 priority,
706 when_done: Box::new(move |data| {
707 io_queue.on_iop_complete();
708 let mut dest = dest.lock().unwrap();
709 let chunk = DataChunk {
710 data,
711 task_idx,
712 num_bytes,
713 };
714 dest.deliver_data(chunk);
715 }),
716 };
717 self.io_queue.push(task);
718 }
719 }
720
721 fn submit_request(
722 &self,
723 reader: Arc<dyn Reader>,
724 request: Vec<Range<u64>>,
725 priority: u128,
726 ) -> impl Future<Output = Result<Vec<Bytes>>> + Send {
727 let (tx, rx) = oneshot::channel::<Response>();
728
729 self.do_submit_request(reader, request, tx, priority);
730
731 let io_queue = self.io_queue.clone();
732
733 rx.map(move |wrapped_rsp| {
734 let rsp = wrapped_rsp.unwrap();
737 io_queue.on_bytes_consumed(rsp.num_bytes, rsp.priority, rsp.num_reqs);
738 rsp.data
739 })
740 }
741
742 pub fn stats(&self) -> ScanStats {
743 ScanStats::new(self.stats.as_ref())
744 }
745}
746
747impl Drop for ScanScheduler {
748 fn drop(&mut self) {
749 self.io_queue.close();
750 }
751}
752
753#[derive(Clone, Debug)]
755pub struct FileScheduler {
756 reader: Arc<dyn Reader>,
757 root: Arc<ScanScheduler>,
758 block_size: u64,
759 base_priority: u64,
760 max_iop_size: u64,
761}
762
763fn is_close_together(range1: &Range<u64>, range2: &Range<u64>, block_size: u64) -> bool {
764 range2.start <= (range1.end + block_size)
766}
767
768fn is_overlapping(range1: &Range<u64>, range2: &Range<u64>) -> bool {
769 range1.start < range2.end && range2.start < range1.end
770}
771
772impl FileScheduler {
773 pub fn submit_request(
786 &self,
787 request: Vec<Range<u64>>,
788 priority: u64,
789 ) -> impl Future<Output = Result<Vec<Bytes>>> + Send {
790 let priority = ((self.base_priority as u128) << 64) + priority as u128;
792
793 let mut merged_requests = Vec::with_capacity(request.len());
794
795 if !request.is_empty() {
796 let mut curr_interval = request[0].clone();
797
798 for req in request.iter().skip(1) {
799 if is_close_together(&curr_interval, req, self.block_size) {
800 curr_interval.end = curr_interval.end.max(req.end);
801 } else {
802 merged_requests.push(curr_interval);
803 curr_interval = req.clone();
804 }
805 }
806
807 merged_requests.push(curr_interval);
808 }
809
810 let mut updated_requests = Vec::with_capacity(merged_requests.len());
811 for req in merged_requests {
812 if req.is_empty() {
813 updated_requests.push(req);
814 } else {
815 let num_requests = (req.end - req.start).div_ceil(self.max_iop_size);
816 let bytes_per_request = (req.end - req.start) / num_requests;
817 for i in 0..num_requests {
818 let start = req.start + i * bytes_per_request;
819 let end = if i == num_requests - 1 {
820 req.end
822 } else {
823 start + bytes_per_request
824 };
825 updated_requests.push(start..end);
826 }
827 }
828 }
829
830 self.root.stats.record_request(&updated_requests);
831
832 let bytes_vec_fut =
833 self.root
834 .submit_request(self.reader.clone(), updated_requests.clone(), priority);
835
836 let mut updated_index = 0;
837 let mut final_bytes = Vec::with_capacity(request.len());
838
839 async move {
840 let bytes_vec = bytes_vec_fut.await?;
841
842 let mut orig_index = 0;
843 while (updated_index < updated_requests.len()) && (orig_index < request.len()) {
844 let updated_range = &updated_requests[updated_index];
845 let orig_range = &request[orig_index];
846 let byte_offset = updated_range.start as usize;
847
848 if is_overlapping(updated_range, orig_range) {
849 let start = orig_range.start as usize - byte_offset;
851 if orig_range.end <= updated_range.end {
852 let end = orig_range.end as usize - byte_offset;
855 final_bytes.push(bytes_vec[updated_index].slice(start..end));
856 } else {
857 let orig_size = orig_range.end - orig_range.start;
860 let mut merged_bytes = Vec::with_capacity(orig_size as usize);
861 merged_bytes.extend_from_slice(&bytes_vec[updated_index].slice(start..));
862 let mut copy_offset = merged_bytes.len() as u64;
863 while copy_offset < orig_size {
864 updated_index += 1;
865 let next_range = &updated_requests[updated_index];
866 let bytes_to_take =
867 (orig_size - copy_offset).min(next_range.end - next_range.start);
868 merged_bytes.extend_from_slice(
869 &bytes_vec[updated_index].slice(0..bytes_to_take as usize),
870 );
871 copy_offset += bytes_to_take;
872 }
873 final_bytes.push(Bytes::from(merged_bytes));
874 }
875 orig_index += 1;
876 } else {
877 updated_index += 1;
878 }
879 }
880
881 Ok(final_bytes)
882 }
883 }
884
885 pub fn with_priority(&self, priority: u64) -> Self {
886 Self {
887 reader: self.reader.clone(),
888 root: self.root.clone(),
889 block_size: self.block_size,
890 max_iop_size: self.max_iop_size,
891 base_priority: priority,
892 }
893 }
894
895 pub fn submit_single(
902 &self,
903 range: Range<u64>,
904 priority: u64,
905 ) -> impl Future<Output = Result<Bytes>> + Send {
906 self.submit_request(vec![range], priority)
907 .map_ok(|vec_bytes| vec_bytes.into_iter().next().unwrap())
908 }
909
910 pub fn reader(&self) -> &Arc<dyn Reader> {
916 &self.reader
917 }
918}
919
920#[cfg(test)]
921mod tests {
922 use std::{collections::VecDeque, time::Duration};
923
924 use futures::poll;
925 use rand::RngCore;
926 use tempfile::tempdir;
927
928 use object_store::{memory::InMemory, GetRange, ObjectStore as OSObjectStore};
929 use tokio::{runtime::Handle, time::timeout};
930 use url::Url;
931
932 use crate::{
933 object_store::{DEFAULT_DOWNLOAD_RETRY_COUNT, DEFAULT_MAX_IOP_SIZE},
934 testing::MockObjectStore,
935 };
936
937 use super::*;
938
939 #[tokio::test]
940 async fn test_full_seq_read() {
941 let tmpdir = tempdir().unwrap();
942 let tmp_path = tmpdir.path().to_str().unwrap();
943 let tmp_path = Path::parse(tmp_path).unwrap();
944 let tmp_file = tmp_path.child("foo.file");
945
946 let obj_store = Arc::new(ObjectStore::local());
947
948 const DATA_SIZE: u64 = 1024 * 1024;
950 let mut some_data = vec![0; DATA_SIZE as usize];
951 rand::thread_rng().fill_bytes(&mut some_data);
952 obj_store.put(&tmp_file, &some_data).await.unwrap();
953
954 let config = SchedulerConfig::default_for_testing();
955
956 let scheduler = ScanScheduler::new(obj_store, config);
957
958 let file_scheduler = scheduler
959 .open_file(&tmp_file, &CachedFileSize::unknown())
960 .await
961 .unwrap();
962
963 const READ_SIZE: u64 = 4 * 1024;
965 let mut reqs = VecDeque::new();
966 let mut offset = 0;
967 while offset < DATA_SIZE {
968 reqs.push_back(
969 #[allow(clippy::single_range_in_vec_init)]
970 file_scheduler
971 .submit_request(vec![offset..offset + READ_SIZE], 0)
972 .await
973 .unwrap(),
974 );
975 offset += READ_SIZE;
976 }
977
978 offset = 0;
979 while offset < DATA_SIZE {
981 let data = reqs.pop_front().unwrap();
982 let actual = &data[0];
983 let expected = &some_data[offset as usize..(offset + READ_SIZE) as usize];
984 assert_eq!(expected, actual);
985 offset += READ_SIZE;
986 }
987 }
988
989 #[tokio::test]
990 async fn test_split_coalesce() {
991 let tmpdir = tempdir().unwrap();
992 let tmp_path = tmpdir.path().to_str().unwrap();
993 let tmp_path = Path::parse(tmp_path).unwrap();
994 let tmp_file = tmp_path.child("foo.file");
995
996 let obj_store = Arc::new(ObjectStore::local());
997
998 const DATA_SIZE: u64 = 75 * 1024 * 1024;
1000 let mut some_data = vec![0; DATA_SIZE as usize];
1001 rand::thread_rng().fill_bytes(&mut some_data);
1002 obj_store.put(&tmp_file, &some_data).await.unwrap();
1003
1004 let config = SchedulerConfig::default_for_testing();
1005
1006 let scheduler = ScanScheduler::new(obj_store, config);
1007
1008 let file_scheduler = scheduler
1009 .open_file(&tmp_file, &CachedFileSize::unknown())
1010 .await
1011 .unwrap();
1012
1013 let req =
1016 file_scheduler.submit_request(vec![50_000..51_000, 52_000..53_000, 54_000..55_000], 0);
1017
1018 let bytes = req.await.unwrap();
1019
1020 assert_eq!(bytes[0], &some_data[50_000..51_000]);
1021 assert_eq!(bytes[1], &some_data[52_000..53_000]);
1022 assert_eq!(bytes[2], &some_data[54_000..55_000]);
1023
1024 assert_eq!(1, scheduler.stats().iops);
1025
1026 let req = file_scheduler.submit_request(vec![0..DATA_SIZE], 0);
1028 let bytes = req.await.unwrap();
1029 assert!(bytes[0] == some_data, "data is not the same");
1030
1031 assert_eq!(6, scheduler.stats().iops);
1032
1033 let chunk_size = *DEFAULT_MAX_IOP_SIZE;
1037 let req = file_scheduler.submit_request(
1038 vec![
1039 10..chunk_size,
1040 chunk_size + 10..(chunk_size * 2) - 20,
1041 chunk_size * 2..(chunk_size * 2) + 10,
1042 ],
1043 0,
1044 );
1045
1046 let bytes = req.await.unwrap();
1047 let chunk_size = chunk_size as usize;
1048 assert!(
1049 bytes[0] == some_data[10..chunk_size],
1050 "data is not the same"
1051 );
1052 assert!(
1053 bytes[1] == some_data[chunk_size + 10..(chunk_size * 2) - 20],
1054 "data is not the same"
1055 );
1056 assert!(
1057 bytes[2] == some_data[chunk_size * 2..(chunk_size * 2) + 10],
1058 "data is not the same"
1059 );
1060 assert_eq!(8, scheduler.stats().iops);
1061
1062 let reads = (0..44)
1063 .map(|i| (i * 1_000_000..(i + 1) * 1_000_000))
1064 .collect::<Vec<_>>();
1065 let req = file_scheduler.submit_request(reads, 0);
1066 let bytes = req.await.unwrap();
1067 for (i, bytes) in bytes.iter().enumerate() {
1068 assert!(
1069 bytes == &some_data[i * 1_000_000..(i + 1) * 1_000_000],
1070 "data is not the same"
1071 );
1072 }
1073 assert_eq!(11, scheduler.stats().iops);
1074 }
1075
1076 #[tokio::test]
1077 async fn test_priority() {
1078 let some_path = Path::parse("foo").unwrap();
1079 let base_store = Arc::new(InMemory::new());
1080 base_store
1081 .put(&some_path, vec![0; 1000].into())
1082 .await
1083 .unwrap();
1084
1085 let semaphore = Arc::new(tokio::sync::Semaphore::new(0));
1086 let mut obj_store = MockObjectStore::default();
1087 let semaphore_copy = semaphore.clone();
1088 obj_store
1089 .expect_get_opts()
1090 .returning(move |location, options| {
1091 let semaphore = semaphore.clone();
1092 let base_store = base_store.clone();
1093 let location = location.clone();
1094 async move {
1095 semaphore.acquire().await.unwrap().forget();
1096 base_store.get_opts(&location, options).await
1097 }
1098 .boxed()
1099 });
1100 let obj_store = Arc::new(ObjectStore::new(
1101 Arc::new(obj_store),
1102 Url::parse("mem://").unwrap(),
1103 Some(500),
1104 None,
1105 false,
1106 false,
1107 1,
1108 DEFAULT_DOWNLOAD_RETRY_COUNT,
1109 ));
1110
1111 let config = SchedulerConfig {
1112 io_buffer_size_bytes: 1024 * 1024,
1113 };
1114
1115 let scan_scheduler = ScanScheduler::new(obj_store, config);
1116
1117 let file_scheduler = scan_scheduler
1118 .open_file(&Path::parse("foo").unwrap(), &CachedFileSize::new(1000))
1119 .await
1120 .unwrap();
1121
1122 let first_fut = timeout(
1126 Duration::from_secs(10),
1127 file_scheduler.submit_single(0..10, 0),
1128 )
1129 .boxed();
1130
1131 let mut second_fut = timeout(
1133 Duration::from_secs(10),
1134 file_scheduler.submit_single(0..20, 100),
1135 )
1136 .boxed();
1137
1138 let mut third_fut = timeout(
1141 Duration::from_secs(10),
1142 file_scheduler.submit_single(0..30, 0),
1143 )
1144 .boxed();
1145
1146 semaphore_copy.add_permits(1);
1148 assert!(first_fut.await.unwrap().unwrap().len() == 10);
1149 assert!(poll!(&mut second_fut).is_pending());
1151 assert!(poll!(&mut third_fut).is_pending());
1152
1153 semaphore_copy.add_permits(1);
1155 assert!(third_fut.await.unwrap().unwrap().len() == 30);
1156 assert!(poll!(&mut second_fut).is_pending());
1157
1158 semaphore_copy.add_permits(1);
1160 assert!(second_fut.await.unwrap().unwrap().len() == 20);
1161 }
1162
1163 #[tokio::test(flavor = "multi_thread")]
1164 async fn test_backpressure() {
1165 let some_path = Path::parse("foo").unwrap();
1166 let base_store = Arc::new(InMemory::new());
1167 base_store
1168 .put(&some_path, vec![0; 100000].into())
1169 .await
1170 .unwrap();
1171
1172 let bytes_read = Arc::new(AtomicU64::from(0));
1173 let mut obj_store = MockObjectStore::default();
1174 let bytes_read_copy = bytes_read.clone();
1175 obj_store
1177 .expect_get_opts()
1178 .returning(move |location, options| {
1179 let range = options.range.as_ref().unwrap();
1180 let num_bytes = match range {
1181 GetRange::Bounded(bounded) => bounded.end - bounded.start,
1182 _ => panic!(),
1183 };
1184 bytes_read_copy.fetch_add(num_bytes as u64, Ordering::Release);
1185 let location = location.clone();
1186 let base_store = base_store.clone();
1187 async move { base_store.get_opts(&location, options).await }.boxed()
1188 });
1189 let obj_store = Arc::new(ObjectStore::new(
1190 Arc::new(obj_store),
1191 Url::parse("mem://").unwrap(),
1192 Some(500),
1193 None,
1194 false,
1195 false,
1196 1,
1197 DEFAULT_DOWNLOAD_RETRY_COUNT,
1198 ));
1199
1200 let config = SchedulerConfig {
1201 io_buffer_size_bytes: 10,
1202 };
1203
1204 let scan_scheduler = ScanScheduler::new(obj_store.clone(), config);
1205
1206 let file_scheduler = scan_scheduler
1207 .open_file(&Path::parse("foo").unwrap(), &CachedFileSize::new(100000))
1208 .await
1209 .unwrap();
1210
1211 let wait_for_idle = || async move {
1212 let handle = Handle::current();
1213 while handle.metrics().num_alive_tasks() != 1 {
1214 tokio::time::sleep(Duration::from_millis(10)).await;
1215 }
1216 };
1217 let wait_for_bytes_read_and_idle = |target_bytes: u64| {
1218 let bytes_read = &bytes_read;
1220 async move {
1221 let bytes_read_copy = bytes_read.clone();
1222 while bytes_read_copy.load(Ordering::Acquire) < target_bytes {
1223 tokio::time::sleep(Duration::from_millis(10)).await;
1224 }
1225 wait_for_idle().await;
1226 }
1227 };
1228
1229 let first_fut = file_scheduler.submit_single(0..5, 0);
1231 let second_fut = file_scheduler.submit_single(0..5, 0);
1233 let third_fut = file_scheduler.submit_single(0..3, 0);
1235 wait_for_bytes_read_and_idle(10).await;
1237
1238 assert_eq!(first_fut.await.unwrap().len(), 5);
1239 wait_for_bytes_read_and_idle(13).await;
1241
1242 let fourth_fut = file_scheduler.submit_single(0..5, 0);
1244 wait_for_bytes_read_and_idle(13).await;
1245
1246 assert_eq!(third_fut.await.unwrap().len(), 3);
1248 wait_for_bytes_read_and_idle(18).await;
1249
1250 assert_eq!(second_fut.await.unwrap().len(), 5);
1251 let fifth_fut = file_scheduler.submit_request(vec![0..3, 90000..90007], 0);
1258 wait_for_bytes_read_and_idle(21).await;
1259
1260 let fifth_bytes = tokio::time::timeout(Duration::from_secs(10), fifth_fut)
1262 .await
1263 .unwrap();
1264 assert_eq!(
1265 fifth_bytes.unwrap().iter().map(|b| b.len()).sum::<usize>(),
1266 10
1267 );
1268
1269 assert_eq!(fourth_fut.await.unwrap().len(), 5);
1271 wait_for_bytes_read_and_idle(28).await;
1272
1273 let config = SchedulerConfig {
1275 io_buffer_size_bytes: 10,
1276 };
1277
1278 let scan_scheduler = ScanScheduler::new(obj_store, config);
1279 let file_scheduler = scan_scheduler
1280 .open_file(&Path::parse("foo").unwrap(), &CachedFileSize::new(100000))
1281 .await
1282 .unwrap();
1283
1284 let first_fut = file_scheduler.submit_single(0..10, 0);
1285 let second_fut = file_scheduler.submit_single(0..10, 0);
1286
1287 std::thread::sleep(Duration::from_millis(100));
1288 assert_eq!(first_fut.await.unwrap().len(), 10);
1289 assert_eq!(second_fut.await.unwrap().len(), 10);
1290 }
1291
1292 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1293 async fn stress_backpressure() {
1294 let some_path = Path::parse("foo").unwrap();
1298 let obj_store = Arc::new(ObjectStore::memory());
1299 obj_store
1300 .put(&some_path, vec![0; 100000].as_slice())
1301 .await
1302 .unwrap();
1303
1304 let config = SchedulerConfig {
1306 io_buffer_size_bytes: 1,
1307 };
1308 let scan_scheduler = ScanScheduler::new(obj_store.clone(), config);
1309 let file_scheduler = scan_scheduler
1310 .open_file(&some_path, &CachedFileSize::unknown())
1311 .await
1312 .unwrap();
1313
1314 let mut futs = Vec::with_capacity(10000);
1315 for idx in 0..10000 {
1316 futs.push(file_scheduler.submit_single(idx..idx + 1, idx));
1317 }
1318
1319 for fut in futs {
1320 fut.await.unwrap();
1321 }
1322 }
1323}