1use bytes::Bytes;
5use futures::channel::oneshot;
6use futures::{FutureExt, TryFutureExt};
7use object_store::path::Path;
8use snafu::location;
9use std::collections::BinaryHeap;
10use std::fmt::Debug;
11use std::future::Future;
12use std::ops::Range;
13use std::sync::atomic::{AtomicU64, Ordering};
14use std::sync::{Arc, Mutex};
15use std::time::Instant;
16use tokio::sync::{Notify, Semaphore, SemaphorePermit};
17
18use lance_core::{Error, Result};
19
20use crate::object_store::ObjectStore;
21use crate::traits::Reader;
22
23const BACKPRESSURE_MIN: u64 = 5;
25const BACKPRESSURE_DEBOUNCE: u64 = 60;
27
28static IOPS_COUNTER: AtomicU64 = AtomicU64::new(0);
30static BYTES_READ_COUNTER: AtomicU64 = AtomicU64::new(0);
32
33pub fn iops_counter() -> u64 {
34 IOPS_COUNTER.load(Ordering::Acquire)
35}
36
37pub fn bytes_read_counter() -> u64 {
38 BYTES_READ_COUNTER.load(Ordering::Acquire)
39}
40
41struct IopsQuota {
68 iops_avail: Option<Semaphore>,
70}
71
72struct IopsReservation<'a> {
77 value: Option<SemaphorePermit<'a>>,
78}
79
80impl IopsReservation<'_> {
81 fn forget(&mut self) {
83 if let Some(value) = self.value.take() {
84 value.forget();
85 }
86 }
87}
88
89impl IopsQuota {
90 fn new() -> Self {
95 let initial_capacity = std::env::var("LANCE_PROCESS_IO_THREADS_LIMIT")
96 .map(|s| {
97 let limit = s
98 .parse::<i32>()
99 .expect("LANCE_PROCESS_IO_THREADS_LIMIT must be a positive integer");
100 if limit <= 0 {
101 panic!("LANCE_PROCESS_IO_THREADS_LIMIT must be a positive integer. To disable the limit, unset the environment variable");
102 }
103 limit
104 })
105 .unwrap_or(-1);
107 let iops_avail = if initial_capacity < 0 {
108 None
109 } else {
110 Some(Semaphore::new(initial_capacity as usize))
111 };
112 Self { iops_avail }
113 }
114
115 fn release(&self) {
117 if let Some(iops_avail) = self.iops_avail.as_ref() {
118 iops_avail.add_permits(1);
119 }
120 }
121
122 async fn acquire(&self) -> IopsReservation {
124 if let Some(iops_avail) = self.iops_avail.as_ref() {
125 IopsReservation {
126 value: Some(iops_avail.acquire().await.unwrap()),
127 }
128 } else {
129 IopsReservation { value: None }
130 }
131 }
132}
133
134lazy_static::lazy_static! {
135 static ref IOPS_QUOTA: IopsQuota = IopsQuota::new();
136}
137
138struct PrioritiesInFlight {
147 in_flight: Vec<u128>,
148}
149
150impl PrioritiesInFlight {
151 fn new(capacity: u32) -> Self {
152 Self {
153 in_flight: Vec::with_capacity(capacity as usize * 2),
154 }
155 }
156
157 fn min_in_flight(&self) -> u128 {
158 self.in_flight.first().copied().unwrap_or(u128::MAX)
159 }
160
161 fn push(&mut self, prio: u128) {
162 let pos = match self.in_flight.binary_search(&prio) {
163 Ok(pos) => pos,
164 Err(pos) => pos,
165 };
166 self.in_flight.insert(pos, prio);
167 }
168
169 fn remove(&mut self, prio: u128) {
170 if let Ok(pos) = self.in_flight.binary_search(&prio) {
171 self.in_flight.remove(pos);
172 } else {
173 unreachable!();
174 }
175 }
176}
177
178struct IoQueueState {
179 iops_avail: u32,
181 bytes_avail: i64,
185 pending_requests: BinaryHeap<IoTask>,
187 priorities_in_flight: PrioritiesInFlight,
189 done_scheduling: bool,
192 start: Instant,
194 last_warn: AtomicU64,
196}
197
198impl IoQueueState {
199 fn new(io_capacity: u32, io_buffer_size: u64) -> Self {
200 Self {
201 iops_avail: io_capacity,
202 bytes_avail: io_buffer_size as i64,
203 pending_requests: BinaryHeap::new(),
204 priorities_in_flight: PrioritiesInFlight::new(io_capacity),
205 done_scheduling: false,
206 start: Instant::now(),
207 last_warn: AtomicU64::from(0),
208 }
209 }
210
211 fn finished(&self) -> bool {
212 self.done_scheduling && self.pending_requests.is_empty()
213 }
214
215 fn warn_if_needed(&self) {
216 let seconds_elapsed = self.start.elapsed().as_secs();
217 let last_warn = self.last_warn.load(Ordering::Acquire);
218 let since_last_warn = seconds_elapsed - last_warn;
219 if (last_warn == 0
220 && seconds_elapsed > BACKPRESSURE_MIN
221 && seconds_elapsed < BACKPRESSURE_DEBOUNCE)
222 || since_last_warn > BACKPRESSURE_DEBOUNCE
223 {
224 tracing::event!(tracing::Level::DEBUG, "Backpressure throttle exceeded");
225 log::debug!("Backpressure throttle is full, I/O will pause until buffer is drained. Max I/O bandwidth will not be achieved because CPU is falling behind");
226 self.last_warn
227 .store(seconds_elapsed.max(1), Ordering::Release);
228 }
229 }
230
231 fn can_deliver(&self, task: &IoTask) -> bool {
232 if self.iops_avail == 0 {
233 false
234 } else if task.priority <= self.priorities_in_flight.min_in_flight() {
235 true
236 } else if task.num_bytes() as i64 > self.bytes_avail {
237 self.warn_if_needed();
238 false
239 } else {
240 true
241 }
242 }
243
244 fn next_task(&mut self) -> Option<IoTask> {
245 let task = self.pending_requests.peek()?;
246 if self.can_deliver(task) {
247 self.priorities_in_flight.push(task.priority);
248 self.iops_avail -= 1;
249 self.bytes_avail -= task.num_bytes() as i64;
250 if self.bytes_avail < 0 {
251 log::debug!(
253 "Backpressure throttle temporarily exceeded by {} bytes due to priority I/O",
254 -self.bytes_avail
255 );
256 }
257 Some(self.pending_requests.pop().unwrap())
258 } else {
259 None
260 }
261 }
262}
263
264struct IoQueue {
269 state: Mutex<IoQueueState>,
271 notify: Notify,
273}
274
275impl IoQueue {
276 fn new(io_capacity: u32, io_buffer_size: u64) -> Self {
277 Self {
278 state: Mutex::new(IoQueueState::new(io_capacity, io_buffer_size)),
279 notify: Notify::new(),
280 }
281 }
282
283 fn push(&self, task: IoTask) {
284 log::trace!(
285 "Inserting I/O request for {} bytes with priority ({},{}) into I/O queue",
286 task.num_bytes(),
287 task.priority >> 64,
288 task.priority & 0xFFFFFFFFFFFFFFFF
289 );
290 let mut state = self.state.lock().unwrap();
291 state.pending_requests.push(task);
292 drop(state);
293
294 self.notify.notify_one();
295 }
296
297 async fn pop(&self) -> Option<IoTask> {
298 loop {
299 {
300 let mut iop_res = IOPS_QUOTA.acquire().await;
305 let mut state = self.state.lock().unwrap();
307 if let Some(task) = state.next_task() {
308 iop_res.forget();
311 return Some(task);
312 }
313
314 if state.finished() {
315 return None;
316 }
317 }
318
319 self.notify.notified().await;
320 }
321 }
322
323 fn on_iop_complete(&self) {
324 let mut state = self.state.lock().unwrap();
325 state.iops_avail += 1;
326 drop(state);
327
328 self.notify.notify_one();
329 }
330
331 fn on_bytes_consumed(&self, bytes: u64, priority: u128, num_reqs: usize) {
332 let mut state = self.state.lock().unwrap();
333 state.bytes_avail += bytes as i64;
334 for _ in 0..num_reqs {
335 state.priorities_in_flight.remove(priority);
336 }
337 drop(state);
338
339 self.notify.notify_one();
340 }
341
342 fn close(&self) {
343 let mut state = self.state.lock().unwrap();
344 state.done_scheduling = true;
345 drop(state);
346
347 self.notify.notify_one();
348 }
349}
350
351struct MutableBatch<F: FnOnce(Response) + Send> {
356 when_done: Option<F>,
357 data_buffers: Vec<Bytes>,
358 num_bytes: u64,
359 priority: u128,
360 num_reqs: usize,
361 err: Option<Box<dyn std::error::Error + Send + Sync + 'static>>,
362}
363
364impl<F: FnOnce(Response) + Send> MutableBatch<F> {
365 fn new(when_done: F, num_data_buffers: u32, priority: u128, num_reqs: usize) -> Self {
366 Self {
367 when_done: Some(when_done),
368 data_buffers: vec![Bytes::default(); num_data_buffers as usize],
369 num_bytes: 0,
370 priority,
371 num_reqs,
372 err: None,
373 }
374 }
375}
376
377impl<F: FnOnce(Response) + Send> Drop for MutableBatch<F> {
382 fn drop(&mut self) {
383 let result = if self.err.is_some() {
385 Err(Error::Wrapped {
386 error: self.err.take().unwrap(),
387 location: location!(),
388 })
389 } else {
390 let mut data = Vec::new();
391 std::mem::swap(&mut data, &mut self.data_buffers);
392 Ok(data)
393 };
394 let response = Response {
397 data: result,
398 num_bytes: self.num_bytes,
399 priority: self.priority,
400 num_reqs: self.num_reqs,
401 };
402 (self.when_done.take().unwrap())(response);
403 }
404}
405
406struct DataChunk {
407 task_idx: usize,
408 num_bytes: u64,
409 data: Result<Bytes>,
410}
411
412trait DataSink: Send {
413 fn deliver_data(&mut self, data: DataChunk);
414}
415
416impl<F: FnOnce(Response) + Send> DataSink for MutableBatch<F> {
417 fn deliver_data(&mut self, data: DataChunk) {
419 self.num_bytes += data.num_bytes;
420 match data.data {
421 Ok(data_bytes) => {
422 self.data_buffers[data.task_idx] = data_bytes;
423 }
424 Err(err) => {
425 self.err.get_or_insert(Box::new(err));
427 }
428 }
429 }
430}
431
432struct IoTask {
433 reader: Arc<dyn Reader>,
434 to_read: Range<u64>,
435 when_done: Box<dyn FnOnce(Result<Bytes>) + Send>,
436 priority: u128,
437}
438
439impl Eq for IoTask {}
440
441impl PartialEq for IoTask {
442 fn eq(&self, other: &Self) -> bool {
443 self.priority == other.priority
444 }
445}
446
447impl PartialOrd for IoTask {
448 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
449 Some(self.cmp(other))
450 }
451}
452
453impl Ord for IoTask {
454 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
455 other.priority.cmp(&self.priority)
457 }
458}
459
460impl IoTask {
461 fn num_bytes(&self) -> u64 {
462 self.to_read.end - self.to_read.start
463 }
464
465 async fn run(self) {
466 let bytes = if self.to_read.start == self.to_read.end {
467 Ok(Bytes::new())
468 } else {
469 let bytes_fut = self
470 .reader
471 .get_range(self.to_read.start as usize..self.to_read.end as usize);
472 IOPS_COUNTER.fetch_add(1, Ordering::Release);
473 BYTES_READ_COUNTER.fetch_add(self.num_bytes(), Ordering::Release);
474 bytes_fut.await.map_err(Error::from)
475 };
476 IOPS_QUOTA.release();
477 (self.when_done)(bytes);
478 }
479}
480
481async fn run_io_loop(tasks: Arc<IoQueue>) {
484 loop {
487 let next_task = tasks.pop().await;
488 match next_task {
489 Some(task) => {
490 tokio::spawn(task.run());
491 }
492 None => {
493 return;
495 }
496 }
497 }
498}
499
500#[derive(Debug)]
501struct StatsCollector {
502 iops: AtomicU64,
503 requests: AtomicU64,
504 bytes_read: AtomicU64,
505}
506
507impl StatsCollector {
508 fn new() -> Self {
509 Self {
510 iops: AtomicU64::new(0),
511 requests: AtomicU64::new(0),
512 bytes_read: AtomicU64::new(0),
513 }
514 }
515
516 fn iops(&self) -> u64 {
517 self.iops.load(Ordering::Relaxed)
518 }
519
520 fn bytes_read(&self) -> u64 {
521 self.bytes_read.load(Ordering::Relaxed)
522 }
523
524 fn requests(&self) -> u64 {
525 self.requests.load(Ordering::Relaxed)
526 }
527
528 fn record_request(&self, request: &[Range<u64>]) {
529 self.requests.fetch_add(1, Ordering::Relaxed);
530 self.iops.fetch_add(request.len() as u64, Ordering::Relaxed);
531 self.bytes_read.fetch_add(
532 request.iter().map(|r| r.end - r.start).sum::<u64>(),
533 Ordering::Relaxed,
534 );
535 }
536}
537
538pub struct ScanStats {
539 pub iops: u64,
540 pub requests: u64,
541 pub bytes_read: u64,
542}
543
544impl ScanStats {
545 fn new(stats: &StatsCollector) -> Self {
546 Self {
547 iops: stats.iops(),
548 requests: stats.requests(),
549 bytes_read: stats.bytes_read(),
550 }
551 }
552}
553
554pub struct ScanScheduler {
559 object_store: Arc<ObjectStore>,
560 io_queue: Arc<IoQueue>,
561 stats: Arc<StatsCollector>,
562}
563
564impl Debug for ScanScheduler {
565 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
566 f.debug_struct("ScanScheduler")
567 .field("object_store", &self.object_store)
568 .finish()
569 }
570}
571
572struct Response {
573 data: Result<Vec<Bytes>>,
574 priority: u128,
575 num_reqs: usize,
576 num_bytes: u64,
577}
578
579#[derive(Debug, Clone, Copy)]
580pub struct SchedulerConfig {
581 pub io_buffer_size_bytes: u64,
585}
586
587impl SchedulerConfig {
588 pub fn default_for_testing() -> Self {
590 Self {
591 io_buffer_size_bytes: 256 * 1024 * 1024,
592 }
593 }
594
595 pub fn max_bandwidth(store: &ObjectStore) -> Self {
598 Self {
599 io_buffer_size_bytes: 32 * 1024 * 1024 * store.io_parallelism() as u64,
600 }
601 }
602}
603
604impl ScanScheduler {
605 pub fn new(object_store: Arc<ObjectStore>, config: SchedulerConfig) -> Arc<Self> {
612 let io_capacity = object_store.io_parallelism();
613 let io_queue = Arc::new(IoQueue::new(
614 io_capacity as u32,
615 config.io_buffer_size_bytes,
616 ));
617 let scheduler = Self {
618 object_store,
619 io_queue: io_queue.clone(),
620 stats: Arc::new(StatsCollector::new()),
621 };
622 tokio::task::spawn(async move { run_io_loop(io_queue).await });
623 Arc::new(scheduler)
624 }
625
626 pub async fn open_file_with_priority(
635 self: &Arc<Self>,
636 path: &Path,
637 base_priority: u64,
638 ) -> Result<FileScheduler> {
639 let reader = self.object_store.open(path).await?;
640 let block_size = self.object_store.block_size() as u64;
641 Ok(FileScheduler {
642 reader: reader.into(),
643 block_size,
644 root: self.clone(),
645 base_priority,
646 })
647 }
648
649 pub async fn open_file(self: &Arc<Self>, path: &Path) -> Result<FileScheduler> {
653 self.open_file_with_priority(path, 0).await
654 }
655
656 fn do_submit_request(
657 &self,
658 reader: Arc<dyn Reader>,
659 request: Vec<Range<u64>>,
660 tx: oneshot::Sender<Response>,
661 priority: u128,
662 ) {
663 let num_iops = request.len() as u32;
664
665 let when_all_io_done = move |bytes_and_permits| {
666 let _ = tx.send(bytes_and_permits);
668 };
669
670 let dest = Arc::new(Mutex::new(Box::new(MutableBatch::new(
671 when_all_io_done,
672 num_iops,
673 priority,
674 request.len(),
675 ))));
676
677 for (task_idx, iop) in request.into_iter().enumerate() {
678 let dest = dest.clone();
679 let io_queue = self.io_queue.clone();
680 let num_bytes = iop.end - iop.start;
681 let task = IoTask {
682 reader: reader.clone(),
683 to_read: iop,
684 priority,
685 when_done: Box::new(move |data| {
686 io_queue.on_iop_complete();
687 let mut dest = dest.lock().unwrap();
688 let chunk = DataChunk {
689 data,
690 task_idx,
691 num_bytes,
692 };
693 dest.deliver_data(chunk);
694 }),
695 };
696 self.io_queue.push(task);
697 }
698 }
699
700 fn submit_request(
701 &self,
702 reader: Arc<dyn Reader>,
703 request: Vec<Range<u64>>,
704 priority: u128,
705 ) -> impl Future<Output = Result<Vec<Bytes>>> + Send {
706 let (tx, rx) = oneshot::channel::<Response>();
707
708 self.do_submit_request(reader, request, tx, priority);
709
710 let io_queue = self.io_queue.clone();
711
712 rx.map(move |wrapped_rsp| {
713 let rsp = wrapped_rsp.unwrap();
716 io_queue.on_bytes_consumed(rsp.num_bytes, rsp.priority, rsp.num_reqs);
717 rsp.data
718 })
719 }
720
721 pub fn stats(&self) -> ScanStats {
722 ScanStats::new(self.stats.as_ref())
723 }
724}
725
726impl Drop for ScanScheduler {
727 fn drop(&mut self) {
728 self.io_queue.close();
729 }
730}
731
732#[derive(Clone, Debug)]
734pub struct FileScheduler {
735 reader: Arc<dyn Reader>,
736 root: Arc<ScanScheduler>,
737 block_size: u64,
738 base_priority: u64,
739}
740
741fn is_close_together(range1: &Range<u64>, range2: &Range<u64>, block_size: u64) -> bool {
742 range2.start <= (range1.end + block_size)
744}
745
746fn is_overlapping(range1: &Range<u64>, range2: &Range<u64>) -> bool {
747 range1.start < range2.end && range2.start < range1.end
748}
749
750impl FileScheduler {
751 pub fn submit_request(
764 &self,
765 request: Vec<Range<u64>>,
766 priority: u64,
767 ) -> impl Future<Output = Result<Vec<Bytes>>> + Send {
768 self.root.stats.record_request(&request);
769
770 let priority = ((self.base_priority as u128) << 64) + priority as u128;
772
773 let mut updated_requests = Vec::with_capacity(request.len());
774
775 if !request.is_empty() {
776 let mut curr_interval = request[0].clone();
777
778 for req in request.iter().skip(1) {
779 if is_close_together(&curr_interval, req, self.block_size) {
780 curr_interval.end = curr_interval.end.max(req.end);
781 } else {
782 updated_requests.push(curr_interval);
783 curr_interval = req.clone();
784 }
785 }
786
787 updated_requests.push(curr_interval);
788 }
789
790 let bytes_vec_fut =
791 self.root
792 .submit_request(self.reader.clone(), updated_requests.clone(), priority);
793
794 let mut updated_index = 0;
795 let mut final_bytes = Vec::with_capacity(request.len());
796
797 async move {
798 let bytes_vec = bytes_vec_fut.await?;
799
800 let mut orig_index = 0;
801 while (updated_index < updated_requests.len()) && (orig_index < request.len()) {
802 let updated_range = &updated_requests[updated_index];
803 let orig_range = &request[orig_index];
804 let byte_offset = updated_range.start as usize;
805
806 if is_overlapping(updated_range, orig_range) {
807 let start = orig_range.start as usize - byte_offset;
810 let end = orig_range.end as usize - byte_offset;
811
812 let sliced_range = bytes_vec[updated_index].slice(start..end);
813 final_bytes.push(sliced_range);
814 orig_index += 1;
815 } else {
816 updated_index += 1;
817 }
818 }
819
820 Ok(final_bytes)
821 }
822 }
823
824 pub fn with_priority(&self, priority: u64) -> Self {
825 Self {
826 reader: self.reader.clone(),
827 root: self.root.clone(),
828 block_size: self.block_size,
829 base_priority: priority,
830 }
831 }
832
833 pub fn submit_single(
840 &self,
841 range: Range<u64>,
842 priority: u64,
843 ) -> impl Future<Output = Result<Bytes>> + Send {
844 self.submit_request(vec![range], priority)
845 .map_ok(|vec_bytes| vec_bytes.into_iter().next().unwrap())
846 }
847
848 pub fn reader(&self) -> &Arc<dyn Reader> {
854 &self.reader
855 }
856}
857
858#[cfg(test)]
859mod tests {
860 use std::{collections::VecDeque, time::Duration};
861
862 use futures::poll;
863 use rand::RngCore;
864 use tempfile::tempdir;
865
866 use object_store::{memory::InMemory, GetRange, ObjectStore as OSObjectStore};
867 use tokio::{runtime::Handle, time::timeout};
868 use url::Url;
869
870 use crate::{object_store::DEFAULT_DOWNLOAD_RETRY_COUNT, testing::MockObjectStore};
871
872 use super::*;
873
874 #[tokio::test]
875 async fn test_full_seq_read() {
876 let tmpdir = tempdir().unwrap();
877 let tmp_path = tmpdir.path().to_str().unwrap();
878 let tmp_path = Path::parse(tmp_path).unwrap();
879 let tmp_file = tmp_path.child("foo.file");
880
881 let obj_store = Arc::new(ObjectStore::local());
882
883 const DATA_SIZE: u64 = 1024 * 1024;
885 let mut some_data = vec![0; DATA_SIZE as usize];
886 rand::thread_rng().fill_bytes(&mut some_data);
887 obj_store.put(&tmp_file, &some_data).await.unwrap();
888
889 let config = SchedulerConfig::default_for_testing();
890
891 let scheduler = ScanScheduler::new(obj_store, config);
892
893 let file_scheduler = scheduler.open_file(&tmp_file).await.unwrap();
894
895 const READ_SIZE: u64 = 4 * 1024;
897 let mut reqs = VecDeque::new();
898 let mut offset = 0;
899 while offset < DATA_SIZE {
900 reqs.push_back(
901 #[allow(clippy::single_range_in_vec_init)]
902 file_scheduler
903 .submit_request(vec![offset..offset + READ_SIZE], 0)
904 .await
905 .unwrap(),
906 );
907 offset += READ_SIZE;
908 }
909
910 offset = 0;
911 while offset < DATA_SIZE {
913 let data = reqs.pop_front().unwrap();
914 let actual = &data[0];
915 let expected = &some_data[offset as usize..(offset + READ_SIZE) as usize];
916 assert_eq!(expected, actual);
917 offset += READ_SIZE;
918 }
919 }
920
921 #[tokio::test]
922 async fn test_priority() {
923 let some_path = Path::parse("foo").unwrap();
924 let base_store = Arc::new(InMemory::new());
925 base_store
926 .put(&some_path, vec![0; 1000].into())
927 .await
928 .unwrap();
929
930 let semaphore = Arc::new(tokio::sync::Semaphore::new(0));
931 let mut obj_store = MockObjectStore::default();
932 let semaphore_copy = semaphore.clone();
933 obj_store
934 .expect_get_opts()
935 .returning(move |location, options| {
936 let semaphore = semaphore.clone();
937 let base_store = base_store.clone();
938 let location = location.clone();
939 async move {
940 semaphore.acquire().await.unwrap().forget();
941 base_store.get_opts(&location, options).await
942 }
943 .boxed()
944 });
945 let obj_store = Arc::new(ObjectStore::new(
946 Arc::new(obj_store),
947 Url::parse("mem://").unwrap(),
948 None,
949 None,
950 false,
951 false,
952 1,
953 DEFAULT_DOWNLOAD_RETRY_COUNT,
954 ));
955
956 let config = SchedulerConfig {
957 io_buffer_size_bytes: 1024 * 1024,
958 };
959
960 let scan_scheduler = ScanScheduler::new(obj_store, config);
961
962 let file_scheduler = scan_scheduler
963 .open_file(&Path::parse("foo").unwrap())
964 .await
965 .unwrap();
966
967 let first_fut = timeout(
971 Duration::from_secs(10),
972 file_scheduler.submit_single(0..10, 0),
973 )
974 .boxed();
975
976 let mut second_fut = timeout(
978 Duration::from_secs(10),
979 file_scheduler.submit_single(0..20, 100),
980 )
981 .boxed();
982
983 let mut third_fut = timeout(
986 Duration::from_secs(10),
987 file_scheduler.submit_single(0..30, 0),
988 )
989 .boxed();
990
991 semaphore_copy.add_permits(1);
993 assert!(first_fut.await.unwrap().unwrap().len() == 10);
994 assert!(poll!(&mut second_fut).is_pending());
996 assert!(poll!(&mut third_fut).is_pending());
997
998 semaphore_copy.add_permits(1);
1000 assert!(third_fut.await.unwrap().unwrap().len() == 30);
1001 assert!(poll!(&mut second_fut).is_pending());
1002
1003 semaphore_copy.add_permits(1);
1005 assert!(second_fut.await.unwrap().unwrap().len() == 20);
1006 }
1007
1008 #[tokio::test(flavor = "multi_thread")]
1009 async fn test_backpressure() {
1010 let some_path = Path::parse("foo").unwrap();
1011 let base_store = Arc::new(InMemory::new());
1012 base_store
1013 .put(&some_path, vec![0; 100000].into())
1014 .await
1015 .unwrap();
1016
1017 let bytes_read = Arc::new(AtomicU64::from(0));
1018 let mut obj_store = MockObjectStore::default();
1019 let bytes_read_copy = bytes_read.clone();
1020 obj_store
1022 .expect_get_opts()
1023 .returning(move |location, options| {
1024 let range = options.range.as_ref().unwrap();
1025 let num_bytes = match range {
1026 GetRange::Bounded(bounded) => bounded.end - bounded.start,
1027 _ => panic!(),
1028 };
1029 bytes_read_copy.fetch_add(num_bytes as u64, Ordering::Release);
1030 let location = location.clone();
1031 let base_store = base_store.clone();
1032 async move { base_store.get_opts(&location, options).await }.boxed()
1033 });
1034 let obj_store = Arc::new(ObjectStore::new(
1035 Arc::new(obj_store),
1036 Url::parse("mem://").unwrap(),
1037 None,
1038 None,
1039 false,
1040 false,
1041 1,
1042 DEFAULT_DOWNLOAD_RETRY_COUNT,
1043 ));
1044
1045 let config = SchedulerConfig {
1046 io_buffer_size_bytes: 10,
1047 };
1048
1049 let scan_scheduler = ScanScheduler::new(obj_store.clone(), config);
1050
1051 let file_scheduler = scan_scheduler
1052 .open_file(&Path::parse("foo").unwrap())
1053 .await
1054 .unwrap();
1055
1056 let wait_for_idle = || async move {
1057 let handle = Handle::current();
1058 while handle.metrics().num_alive_tasks() != 1 {
1059 tokio::time::sleep(Duration::from_millis(10)).await;
1060 }
1061 };
1062 let wait_for_bytes_read_and_idle = |target_bytes: u64| {
1063 let bytes_read = &bytes_read;
1065 async move {
1066 let bytes_read_copy = bytes_read.clone();
1067 while bytes_read_copy.load(Ordering::Acquire) < target_bytes {
1068 tokio::time::sleep(Duration::from_millis(10)).await;
1069 }
1070 wait_for_idle().await;
1071 }
1072 };
1073
1074 let first_fut = file_scheduler.submit_single(0..5, 0);
1076 let second_fut = file_scheduler.submit_single(0..5, 0);
1078 let third_fut = file_scheduler.submit_single(0..3, 0);
1080 wait_for_bytes_read_and_idle(10).await;
1082
1083 assert_eq!(first_fut.await.unwrap().len(), 5);
1084 wait_for_bytes_read_and_idle(13).await;
1086
1087 let fourth_fut = file_scheduler.submit_single(0..5, 0);
1089 wait_for_bytes_read_and_idle(13).await;
1090
1091 assert_eq!(third_fut.await.unwrap().len(), 3);
1093 wait_for_bytes_read_and_idle(18).await;
1094
1095 assert_eq!(second_fut.await.unwrap().len(), 5);
1096 let fifth_fut = file_scheduler.submit_request(vec![0..3, 90000..90007], 0);
1103 wait_for_bytes_read_and_idle(21).await;
1104
1105 let fifth_bytes = tokio::time::timeout(Duration::from_secs(10), fifth_fut)
1107 .await
1108 .unwrap();
1109 assert_eq!(
1110 fifth_bytes.unwrap().iter().map(|b| b.len()).sum::<usize>(),
1111 10
1112 );
1113
1114 assert_eq!(fourth_fut.await.unwrap().len(), 5);
1116 wait_for_bytes_read_and_idle(28).await;
1117
1118 let config = SchedulerConfig {
1120 io_buffer_size_bytes: 10,
1121 };
1122
1123 let scan_scheduler = ScanScheduler::new(obj_store, config);
1124 let file_scheduler = scan_scheduler
1125 .open_file(&Path::parse("foo").unwrap())
1126 .await
1127 .unwrap();
1128
1129 let first_fut = file_scheduler.submit_single(0..10, 0);
1130 let second_fut = file_scheduler.submit_single(0..10, 0);
1131
1132 std::thread::sleep(Duration::from_millis(100));
1133 assert_eq!(first_fut.await.unwrap().len(), 10);
1134 assert_eq!(second_fut.await.unwrap().len(), 10);
1135 }
1136
1137 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1138 async fn stress_backpressure() {
1139 let some_path = Path::parse("foo").unwrap();
1143 let obj_store = Arc::new(ObjectStore::memory());
1144 obj_store
1145 .put(&some_path, vec![0; 100000].as_slice())
1146 .await
1147 .unwrap();
1148
1149 let config = SchedulerConfig {
1151 io_buffer_size_bytes: 1,
1152 };
1153 let scan_scheduler = ScanScheduler::new(obj_store.clone(), config);
1154 let file_scheduler = scan_scheduler.open_file(&some_path).await.unwrap();
1155
1156 let mut futs = Vec::with_capacity(10000);
1157 for idx in 0..10000 {
1158 futs.push(file_scheduler.submit_single(idx..idx + 1, idx));
1159 }
1160
1161 for fut in futs {
1162 fut.await.unwrap();
1163 }
1164 }
1165}