1use bytes::Bytes;
5use futures::channel::oneshot;
6use futures::{FutureExt, TryFutureExt};
7use object_store::path::Path;
8use snafu::location;
9use std::collections::BinaryHeap;
10use std::fmt::Debug;
11use std::future::Future;
12use std::num::NonZero;
13use std::ops::Range;
14use std::sync::atomic::{AtomicU64, Ordering};
15use std::sync::{Arc, Mutex};
16use std::time::Instant;
17use tokio::sync::{Notify, Semaphore, SemaphorePermit};
18
19use lance_core::{Error, Result};
20
21use crate::object_store::ObjectStore;
22use crate::traits::Reader;
23use crate::utils::CachedFileSize;
24
25const BACKPRESSURE_MIN: u64 = 5;
27const BACKPRESSURE_DEBOUNCE: u64 = 60;
29
30static IOPS_COUNTER: AtomicU64 = AtomicU64::new(0);
32static BYTES_READ_COUNTER: AtomicU64 = AtomicU64::new(0);
34static DEFAULT_PROCESS_IOPS_LIMIT: i32 = 128;
47
48pub fn iops_counter() -> u64 {
49 IOPS_COUNTER.load(Ordering::Acquire)
50}
51
52pub fn bytes_read_counter() -> u64 {
53 BYTES_READ_COUNTER.load(Ordering::Acquire)
54}
55
56struct IopsQuota {
83 iops_avail: Option<Semaphore>,
85}
86
87struct IopsReservation<'a> {
92 value: Option<SemaphorePermit<'a>>,
93}
94
95impl IopsReservation<'_> {
96 fn forget(&mut self) {
98 if let Some(value) = self.value.take() {
99 value.forget();
100 }
101 }
102}
103
104impl IopsQuota {
105 fn new() -> Self {
110 let initial_capacity = std::env::var("LANCE_PROCESS_IO_THREADS_LIMIT")
111 .map(|s| {
112 s.parse::<i32>().unwrap_or_else(|_| {
113 log::warn!("Ignoring invalid LANCE_PROCESS_IO_THREADS_LIMIT: {}", s);
114 DEFAULT_PROCESS_IOPS_LIMIT
115 })
116 })
117 .unwrap_or(DEFAULT_PROCESS_IOPS_LIMIT);
118 let iops_avail = if initial_capacity <= 0 {
119 None
120 } else {
121 Some(Semaphore::new(initial_capacity as usize))
122 };
123 Self { iops_avail }
124 }
125
126 fn release(&self) {
128 if let Some(iops_avail) = self.iops_avail.as_ref() {
129 iops_avail.add_permits(1);
130 }
131 }
132
133 async fn acquire(&self) -> IopsReservation {
135 if let Some(iops_avail) = self.iops_avail.as_ref() {
136 IopsReservation {
137 value: Some(iops_avail.acquire().await.unwrap()),
138 }
139 } else {
140 IopsReservation { value: None }
141 }
142 }
143}
144
145lazy_static::lazy_static! {
146 static ref IOPS_QUOTA: IopsQuota = IopsQuota::new();
147}
148
149struct PrioritiesInFlight {
158 in_flight: Vec<u128>,
159}
160
161impl PrioritiesInFlight {
162 fn new(capacity: u32) -> Self {
163 Self {
164 in_flight: Vec::with_capacity(capacity as usize * 2),
165 }
166 }
167
168 fn min_in_flight(&self) -> u128 {
169 self.in_flight.first().copied().unwrap_or(u128::MAX)
170 }
171
172 fn push(&mut self, prio: u128) {
173 let pos = match self.in_flight.binary_search(&prio) {
174 Ok(pos) => pos,
175 Err(pos) => pos,
176 };
177 self.in_flight.insert(pos, prio);
178 }
179
180 fn remove(&mut self, prio: u128) {
181 if let Ok(pos) = self.in_flight.binary_search(&prio) {
182 self.in_flight.remove(pos);
183 } else {
184 unreachable!();
185 }
186 }
187}
188
189struct IoQueueState {
190 iops_avail: u32,
192 bytes_avail: i64,
196 pending_requests: BinaryHeap<IoTask>,
198 priorities_in_flight: PrioritiesInFlight,
200 done_scheduling: bool,
203 start: Instant,
205 last_warn: AtomicU64,
207}
208
209impl IoQueueState {
210 fn new(io_capacity: u32, io_buffer_size: u64) -> Self {
211 Self {
212 iops_avail: io_capacity,
213 bytes_avail: io_buffer_size as i64,
214 pending_requests: BinaryHeap::new(),
215 priorities_in_flight: PrioritiesInFlight::new(io_capacity),
216 done_scheduling: false,
217 start: Instant::now(),
218 last_warn: AtomicU64::from(0),
219 }
220 }
221
222 fn finished(&self) -> bool {
223 self.done_scheduling && self.pending_requests.is_empty()
224 }
225
226 fn warn_if_needed(&self) {
227 let seconds_elapsed = self.start.elapsed().as_secs();
228 let last_warn = self.last_warn.load(Ordering::Acquire);
229 let since_last_warn = seconds_elapsed - last_warn;
230 if (last_warn == 0
231 && seconds_elapsed > BACKPRESSURE_MIN
232 && seconds_elapsed < BACKPRESSURE_DEBOUNCE)
233 || since_last_warn > BACKPRESSURE_DEBOUNCE
234 {
235 tracing::event!(tracing::Level::DEBUG, "Backpressure throttle exceeded");
236 log::debug!("Backpressure throttle is full, I/O will pause until buffer is drained. Max I/O bandwidth will not be achieved because CPU is falling behind");
237 self.last_warn
238 .store(seconds_elapsed.max(1), Ordering::Release);
239 }
240 }
241
242 fn can_deliver(&self, task: &IoTask) -> bool {
243 if self.iops_avail == 0 {
244 false
245 } else if task.priority <= self.priorities_in_flight.min_in_flight() {
246 true
247 } else if task.num_bytes() as i64 > self.bytes_avail {
248 self.warn_if_needed();
249 false
250 } else {
251 true
252 }
253 }
254
255 fn next_task(&mut self) -> Option<IoTask> {
256 let task = self.pending_requests.peek()?;
257 if self.can_deliver(task) {
258 self.priorities_in_flight.push(task.priority);
259 self.iops_avail -= 1;
260 self.bytes_avail -= task.num_bytes() as i64;
261 if self.bytes_avail < 0 {
262 log::debug!(
264 "Backpressure throttle temporarily exceeded by {} bytes due to priority I/O",
265 -self.bytes_avail
266 );
267 }
268 Some(self.pending_requests.pop().unwrap())
269 } else {
270 None
271 }
272 }
273}
274
275struct IoQueue {
280 state: Mutex<IoQueueState>,
282 notify: Notify,
284}
285
286impl IoQueue {
287 fn new(io_capacity: u32, io_buffer_size: u64) -> Self {
288 Self {
289 state: Mutex::new(IoQueueState::new(io_capacity, io_buffer_size)),
290 notify: Notify::new(),
291 }
292 }
293
294 fn push(&self, task: IoTask) {
295 log::trace!(
296 "Inserting I/O request for {} bytes with priority ({},{}) into I/O queue",
297 task.num_bytes(),
298 task.priority >> 64,
299 task.priority & 0xFFFFFFFFFFFFFFFF
300 );
301 let mut state = self.state.lock().unwrap();
302 state.pending_requests.push(task);
303 drop(state);
304
305 self.notify.notify_one();
306 }
307
308 async fn pop(&self) -> Option<IoTask> {
309 loop {
310 {
311 let mut iop_res = IOPS_QUOTA.acquire().await;
316 let mut state = self.state.lock().unwrap();
318 if let Some(task) = state.next_task() {
319 iop_res.forget();
322 return Some(task);
323 }
324
325 if state.finished() {
326 return None;
327 }
328 }
329
330 self.notify.notified().await;
331 }
332 }
333
334 fn on_iop_complete(&self) {
335 let mut state = self.state.lock().unwrap();
336 state.iops_avail += 1;
337 drop(state);
338
339 self.notify.notify_one();
340 }
341
342 fn on_bytes_consumed(&self, bytes: u64, priority: u128, num_reqs: usize) {
343 let mut state = self.state.lock().unwrap();
344 state.bytes_avail += bytes as i64;
345 for _ in 0..num_reqs {
346 state.priorities_in_flight.remove(priority);
347 }
348 drop(state);
349
350 self.notify.notify_one();
351 }
352
353 fn close(&self) {
354 let mut state = self.state.lock().unwrap();
355 state.done_scheduling = true;
356 drop(state);
357
358 self.notify.notify_one();
359 }
360}
361
362struct MutableBatch<F: FnOnce(Response) + Send> {
367 when_done: Option<F>,
368 data_buffers: Vec<Bytes>,
369 num_bytes: u64,
370 priority: u128,
371 num_reqs: usize,
372 err: Option<Box<dyn std::error::Error + Send + Sync + 'static>>,
373}
374
375impl<F: FnOnce(Response) + Send> MutableBatch<F> {
376 fn new(when_done: F, num_data_buffers: u32, priority: u128, num_reqs: usize) -> Self {
377 Self {
378 when_done: Some(when_done),
379 data_buffers: vec![Bytes::default(); num_data_buffers as usize],
380 num_bytes: 0,
381 priority,
382 num_reqs,
383 err: None,
384 }
385 }
386}
387
388impl<F: FnOnce(Response) + Send> Drop for MutableBatch<F> {
393 fn drop(&mut self) {
394 let result = if self.err.is_some() {
396 Err(Error::Wrapped {
397 error: self.err.take().unwrap(),
398 location: location!(),
399 })
400 } else {
401 let mut data = Vec::new();
402 std::mem::swap(&mut data, &mut self.data_buffers);
403 Ok(data)
404 };
405 let response = Response {
408 data: result,
409 num_bytes: self.num_bytes,
410 priority: self.priority,
411 num_reqs: self.num_reqs,
412 };
413 (self.when_done.take().unwrap())(response);
414 }
415}
416
417struct DataChunk {
418 task_idx: usize,
419 num_bytes: u64,
420 data: Result<Bytes>,
421}
422
423trait DataSink: Send {
424 fn deliver_data(&mut self, data: DataChunk);
425}
426
427impl<F: FnOnce(Response) + Send> DataSink for MutableBatch<F> {
428 fn deliver_data(&mut self, data: DataChunk) {
430 self.num_bytes += data.num_bytes;
431 match data.data {
432 Ok(data_bytes) => {
433 self.data_buffers[data.task_idx] = data_bytes;
434 }
435 Err(err) => {
436 self.err.get_or_insert(Box::new(err));
438 }
439 }
440 }
441}
442
443struct IoTask {
444 reader: Arc<dyn Reader>,
445 to_read: Range<u64>,
446 when_done: Box<dyn FnOnce(Result<Bytes>) + Send>,
447 priority: u128,
448}
449
450impl Eq for IoTask {}
451
452impl PartialEq for IoTask {
453 fn eq(&self, other: &Self) -> bool {
454 self.priority == other.priority
455 }
456}
457
458impl PartialOrd for IoTask {
459 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
460 Some(self.cmp(other))
461 }
462}
463
464impl Ord for IoTask {
465 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
466 other.priority.cmp(&self.priority)
468 }
469}
470
471impl IoTask {
472 fn num_bytes(&self) -> u64 {
473 self.to_read.end - self.to_read.start
474 }
475
476 async fn run(self) {
477 let bytes = if self.to_read.start == self.to_read.end {
478 Ok(Bytes::new())
479 } else {
480 let bytes_fut = self
481 .reader
482 .get_range(self.to_read.start as usize..self.to_read.end as usize);
483 IOPS_COUNTER.fetch_add(1, Ordering::Release);
484 BYTES_READ_COUNTER.fetch_add(self.num_bytes(), Ordering::Release);
485 bytes_fut.await.map_err(Error::from)
486 };
487 IOPS_QUOTA.release();
488 (self.when_done)(bytes);
489 }
490}
491
492async fn run_io_loop(tasks: Arc<IoQueue>) {
495 loop {
498 let next_task = tasks.pop().await;
499 match next_task {
500 Some(task) => {
501 tokio::spawn(task.run());
502 }
503 None => {
504 return;
506 }
507 }
508 }
509}
510
511#[derive(Debug)]
512struct StatsCollector {
513 iops: AtomicU64,
514 requests: AtomicU64,
515 bytes_read: AtomicU64,
516}
517
518impl StatsCollector {
519 fn new() -> Self {
520 Self {
521 iops: AtomicU64::new(0),
522 requests: AtomicU64::new(0),
523 bytes_read: AtomicU64::new(0),
524 }
525 }
526
527 fn iops(&self) -> u64 {
528 self.iops.load(Ordering::Relaxed)
529 }
530
531 fn bytes_read(&self) -> u64 {
532 self.bytes_read.load(Ordering::Relaxed)
533 }
534
535 fn requests(&self) -> u64 {
536 self.requests.load(Ordering::Relaxed)
537 }
538
539 fn record_request(&self, request: &[Range<u64>]) {
540 self.requests.fetch_add(1, Ordering::Relaxed);
541 self.iops.fetch_add(request.len() as u64, Ordering::Relaxed);
542 self.bytes_read.fetch_add(
543 request.iter().map(|r| r.end - r.start).sum::<u64>(),
544 Ordering::Relaxed,
545 );
546 }
547}
548
549pub struct ScanStats {
550 pub iops: u64,
551 pub requests: u64,
552 pub bytes_read: u64,
553}
554
555impl ScanStats {
556 fn new(stats: &StatsCollector) -> Self {
557 Self {
558 iops: stats.iops(),
559 requests: stats.requests(),
560 bytes_read: stats.bytes_read(),
561 }
562 }
563}
564
565pub struct ScanScheduler {
570 object_store: Arc<ObjectStore>,
571 io_queue: Arc<IoQueue>,
572 stats: Arc<StatsCollector>,
573}
574
575impl Debug for ScanScheduler {
576 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
577 f.debug_struct("ScanScheduler")
578 .field("object_store", &self.object_store)
579 .finish()
580 }
581}
582
583struct Response {
584 data: Result<Vec<Bytes>>,
585 priority: u128,
586 num_reqs: usize,
587 num_bytes: u64,
588}
589
590#[derive(Debug, Clone, Copy)]
591pub struct SchedulerConfig {
592 pub io_buffer_size_bytes: u64,
596}
597
598impl SchedulerConfig {
599 pub fn default_for_testing() -> Self {
601 Self {
602 io_buffer_size_bytes: 256 * 1024 * 1024,
603 }
604 }
605
606 pub fn max_bandwidth(store: &ObjectStore) -> Self {
609 Self {
610 io_buffer_size_bytes: 32 * 1024 * 1024 * store.io_parallelism() as u64,
611 }
612 }
613}
614
615impl ScanScheduler {
616 pub fn new(object_store: Arc<ObjectStore>, config: SchedulerConfig) -> Arc<Self> {
623 let io_capacity = object_store.io_parallelism();
624 let io_queue = Arc::new(IoQueue::new(
625 io_capacity as u32,
626 config.io_buffer_size_bytes,
627 ));
628 let scheduler = Self {
629 object_store,
630 io_queue: io_queue.clone(),
631 stats: Arc::new(StatsCollector::new()),
632 };
633 tokio::task::spawn(async move { run_io_loop(io_queue).await });
634 Arc::new(scheduler)
635 }
636
637 pub async fn open_file_with_priority(
646 self: &Arc<Self>,
647 path: &Path,
648 base_priority: u64,
649 file_size_bytes: &CachedFileSize,
650 ) -> Result<FileScheduler> {
651 let file_size_bytes = if let Some(size) = file_size_bytes.get() {
652 u64::from(size) as usize
653 } else {
654 let size = self.object_store.size(path).await?;
655 if let Some(size) = NonZero::new(size as u64) {
656 file_size_bytes.set(size);
657 }
658 size
659 };
660 let reader = self
661 .object_store
662 .open_with_size(path, file_size_bytes)
663 .await?;
664 let block_size = self.object_store.block_size() as u64;
665 let max_iop_size = self.object_store.max_iop_size();
666 Ok(FileScheduler {
667 reader: reader.into(),
668 block_size,
669 root: self.clone(),
670 base_priority,
671 max_iop_size,
672 })
673 }
674
675 pub async fn open_file(
679 self: &Arc<Self>,
680 path: &Path,
681 file_size_bytes: &CachedFileSize,
682 ) -> Result<FileScheduler> {
683 self.open_file_with_priority(path, 0, file_size_bytes).await
684 }
685
686 fn do_submit_request(
687 &self,
688 reader: Arc<dyn Reader>,
689 request: Vec<Range<u64>>,
690 tx: oneshot::Sender<Response>,
691 priority: u128,
692 ) {
693 let num_iops = request.len() as u32;
694
695 let when_all_io_done = move |bytes_and_permits| {
696 let _ = tx.send(bytes_and_permits);
698 };
699
700 let dest = Arc::new(Mutex::new(Box::new(MutableBatch::new(
701 when_all_io_done,
702 num_iops,
703 priority,
704 request.len(),
705 ))));
706
707 for (task_idx, iop) in request.into_iter().enumerate() {
708 let dest = dest.clone();
709 let io_queue = self.io_queue.clone();
710 let num_bytes = iop.end - iop.start;
711 let task = IoTask {
712 reader: reader.clone(),
713 to_read: iop,
714 priority,
715 when_done: Box::new(move |data| {
716 io_queue.on_iop_complete();
717 let mut dest = dest.lock().unwrap();
718 let chunk = DataChunk {
719 data,
720 task_idx,
721 num_bytes,
722 };
723 dest.deliver_data(chunk);
724 }),
725 };
726 self.io_queue.push(task);
727 }
728 }
729
730 fn submit_request(
731 &self,
732 reader: Arc<dyn Reader>,
733 request: Vec<Range<u64>>,
734 priority: u128,
735 ) -> impl Future<Output = Result<Vec<Bytes>>> + Send {
736 let (tx, rx) = oneshot::channel::<Response>();
737
738 self.do_submit_request(reader, request, tx, priority);
739
740 let io_queue = self.io_queue.clone();
741
742 rx.map(move |wrapped_rsp| {
743 let rsp = wrapped_rsp.unwrap();
746 io_queue.on_bytes_consumed(rsp.num_bytes, rsp.priority, rsp.num_reqs);
747 rsp.data
748 })
749 }
750
751 pub fn stats(&self) -> ScanStats {
752 ScanStats::new(self.stats.as_ref())
753 }
754}
755
756impl Drop for ScanScheduler {
757 fn drop(&mut self) {
758 self.io_queue.close();
759 }
760}
761
762#[derive(Clone, Debug)]
764pub struct FileScheduler {
765 reader: Arc<dyn Reader>,
766 root: Arc<ScanScheduler>,
767 block_size: u64,
768 base_priority: u64,
769 max_iop_size: u64,
770}
771
772fn is_close_together(range1: &Range<u64>, range2: &Range<u64>, block_size: u64) -> bool {
773 range2.start <= (range1.end + block_size)
775}
776
777fn is_overlapping(range1: &Range<u64>, range2: &Range<u64>) -> bool {
778 range1.start < range2.end && range2.start < range1.end
779}
780
781impl FileScheduler {
782 pub fn submit_request(
795 &self,
796 request: Vec<Range<u64>>,
797 priority: u64,
798 ) -> impl Future<Output = Result<Vec<Bytes>>> + Send {
799 let priority = ((self.base_priority as u128) << 64) + priority as u128;
801
802 let mut merged_requests = Vec::with_capacity(request.len());
803
804 if !request.is_empty() {
805 let mut curr_interval = request[0].clone();
806
807 for req in request.iter().skip(1) {
808 if is_close_together(&curr_interval, req, self.block_size) {
809 curr_interval.end = curr_interval.end.max(req.end);
810 } else {
811 merged_requests.push(curr_interval);
812 curr_interval = req.clone();
813 }
814 }
815
816 merged_requests.push(curr_interval);
817 }
818
819 let mut updated_requests = Vec::with_capacity(merged_requests.len());
820 for req in merged_requests {
821 if req.is_empty() {
822 updated_requests.push(req);
823 } else {
824 let num_requests = (req.end - req.start).div_ceil(self.max_iop_size);
825 let bytes_per_request = (req.end - req.start) / num_requests;
826 for i in 0..num_requests {
827 let start = req.start + i * bytes_per_request;
828 let end = if i == num_requests - 1 {
829 req.end
831 } else {
832 start + bytes_per_request
833 };
834 updated_requests.push(start..end);
835 }
836 }
837 }
838
839 self.root.stats.record_request(&updated_requests);
840
841 let bytes_vec_fut =
842 self.root
843 .submit_request(self.reader.clone(), updated_requests.clone(), priority);
844
845 let mut updated_index = 0;
846 let mut final_bytes = Vec::with_capacity(request.len());
847
848 async move {
849 let bytes_vec = bytes_vec_fut.await?;
850
851 let mut orig_index = 0;
852 while (updated_index < updated_requests.len()) && (orig_index < request.len()) {
853 let updated_range = &updated_requests[updated_index];
854 let orig_range = &request[orig_index];
855 let byte_offset = updated_range.start as usize;
856
857 if is_overlapping(updated_range, orig_range) {
858 let start = orig_range.start as usize - byte_offset;
860 if orig_range.end <= updated_range.end {
861 let end = orig_range.end as usize - byte_offset;
864 final_bytes.push(bytes_vec[updated_index].slice(start..end));
865 } else {
866 let orig_size = orig_range.end - orig_range.start;
869 let mut merged_bytes = Vec::with_capacity(orig_size as usize);
870 merged_bytes.extend_from_slice(&bytes_vec[updated_index].slice(start..));
871 let mut copy_offset = merged_bytes.len() as u64;
872 while copy_offset < orig_size {
873 updated_index += 1;
874 let next_range = &updated_requests[updated_index];
875 let bytes_to_take =
876 (orig_size - copy_offset).min(next_range.end - next_range.start);
877 merged_bytes.extend_from_slice(
878 &bytes_vec[updated_index].slice(0..bytes_to_take as usize),
879 );
880 copy_offset += bytes_to_take;
881 }
882 final_bytes.push(Bytes::from(merged_bytes));
883 }
884 orig_index += 1;
885 } else {
886 updated_index += 1;
887 }
888 }
889
890 Ok(final_bytes)
891 }
892 }
893
894 pub fn with_priority(&self, priority: u64) -> Self {
895 Self {
896 reader: self.reader.clone(),
897 root: self.root.clone(),
898 block_size: self.block_size,
899 max_iop_size: self.max_iop_size,
900 base_priority: priority,
901 }
902 }
903
904 pub fn submit_single(
911 &self,
912 range: Range<u64>,
913 priority: u64,
914 ) -> impl Future<Output = Result<Bytes>> + Send {
915 self.submit_request(vec![range], priority)
916 .map_ok(|vec_bytes| vec_bytes.into_iter().next().unwrap())
917 }
918
919 pub fn reader(&self) -> &Arc<dyn Reader> {
925 &self.reader
926 }
927}
928
929#[cfg(test)]
930mod tests {
931 use std::{collections::VecDeque, time::Duration};
932
933 use futures::poll;
934 use rand::RngCore;
935 use tempfile::tempdir;
936
937 use object_store::{memory::InMemory, GetRange, ObjectStore as OSObjectStore};
938 use tokio::{runtime::Handle, time::timeout};
939 use url::Url;
940
941 use crate::{
942 object_store::{DEFAULT_DOWNLOAD_RETRY_COUNT, DEFAULT_MAX_IOP_SIZE},
943 testing::MockObjectStore,
944 };
945
946 use super::*;
947
948 #[tokio::test]
949 async fn test_full_seq_read() {
950 let tmpdir = tempdir().unwrap();
951 let tmp_path = tmpdir.path().to_str().unwrap();
952 let tmp_path = Path::parse(tmp_path).unwrap();
953 let tmp_file = tmp_path.child("foo.file");
954
955 let obj_store = Arc::new(ObjectStore::local());
956
957 const DATA_SIZE: u64 = 1024 * 1024;
959 let mut some_data = vec![0; DATA_SIZE as usize];
960 rand::thread_rng().fill_bytes(&mut some_data);
961 obj_store.put(&tmp_file, &some_data).await.unwrap();
962
963 let config = SchedulerConfig::default_for_testing();
964
965 let scheduler = ScanScheduler::new(obj_store, config);
966
967 let file_scheduler = scheduler
968 .open_file(&tmp_file, &CachedFileSize::unknown())
969 .await
970 .unwrap();
971
972 const READ_SIZE: u64 = 4 * 1024;
974 let mut reqs = VecDeque::new();
975 let mut offset = 0;
976 while offset < DATA_SIZE {
977 reqs.push_back(
978 #[allow(clippy::single_range_in_vec_init)]
979 file_scheduler
980 .submit_request(vec![offset..offset + READ_SIZE], 0)
981 .await
982 .unwrap(),
983 );
984 offset += READ_SIZE;
985 }
986
987 offset = 0;
988 while offset < DATA_SIZE {
990 let data = reqs.pop_front().unwrap();
991 let actual = &data[0];
992 let expected = &some_data[offset as usize..(offset + READ_SIZE) as usize];
993 assert_eq!(expected, actual);
994 offset += READ_SIZE;
995 }
996 }
997
998 #[tokio::test]
999 async fn test_split_coalesce() {
1000 let tmpdir = tempdir().unwrap();
1001 let tmp_path = tmpdir.path().to_str().unwrap();
1002 let tmp_path = Path::parse(tmp_path).unwrap();
1003 let tmp_file = tmp_path.child("foo.file");
1004
1005 let obj_store = Arc::new(ObjectStore::local());
1006
1007 const DATA_SIZE: u64 = 75 * 1024 * 1024;
1009 let mut some_data = vec![0; DATA_SIZE as usize];
1010 rand::thread_rng().fill_bytes(&mut some_data);
1011 obj_store.put(&tmp_file, &some_data).await.unwrap();
1012
1013 let config = SchedulerConfig::default_for_testing();
1014
1015 let scheduler = ScanScheduler::new(obj_store, config);
1016
1017 let file_scheduler = scheduler
1018 .open_file(&tmp_file, &CachedFileSize::unknown())
1019 .await
1020 .unwrap();
1021
1022 let req =
1025 file_scheduler.submit_request(vec![50_000..51_000, 52_000..53_000, 54_000..55_000], 0);
1026
1027 let bytes = req.await.unwrap();
1028
1029 assert_eq!(bytes[0], &some_data[50_000..51_000]);
1030 assert_eq!(bytes[1], &some_data[52_000..53_000]);
1031 assert_eq!(bytes[2], &some_data[54_000..55_000]);
1032
1033 assert_eq!(1, scheduler.stats().iops);
1034
1035 let req = file_scheduler.submit_request(vec![0..DATA_SIZE], 0);
1037 let bytes = req.await.unwrap();
1038 assert!(bytes[0] == some_data, "data is not the same");
1039
1040 assert_eq!(6, scheduler.stats().iops);
1041
1042 let chunk_size = *DEFAULT_MAX_IOP_SIZE;
1046 let req = file_scheduler.submit_request(
1047 vec![
1048 10..chunk_size,
1049 chunk_size + 10..(chunk_size * 2) - 20,
1050 chunk_size * 2..(chunk_size * 2) + 10,
1051 ],
1052 0,
1053 );
1054
1055 let bytes = req.await.unwrap();
1056 let chunk_size = chunk_size as usize;
1057 assert!(
1058 bytes[0] == some_data[10..chunk_size],
1059 "data is not the same"
1060 );
1061 assert!(
1062 bytes[1] == some_data[chunk_size + 10..(chunk_size * 2) - 20],
1063 "data is not the same"
1064 );
1065 assert!(
1066 bytes[2] == some_data[chunk_size * 2..(chunk_size * 2) + 10],
1067 "data is not the same"
1068 );
1069 assert_eq!(8, scheduler.stats().iops);
1070
1071 let reads = (0..44)
1072 .map(|i| (i * 1_000_000..(i + 1) * 1_000_000))
1073 .collect::<Vec<_>>();
1074 let req = file_scheduler.submit_request(reads, 0);
1075 let bytes = req.await.unwrap();
1076 for (i, bytes) in bytes.iter().enumerate() {
1077 assert!(
1078 bytes == &some_data[i * 1_000_000..(i + 1) * 1_000_000],
1079 "data is not the same"
1080 );
1081 }
1082 assert_eq!(11, scheduler.stats().iops);
1083 }
1084
1085 #[tokio::test]
1086 async fn test_priority() {
1087 let some_path = Path::parse("foo").unwrap();
1088 let base_store = Arc::new(InMemory::new());
1089 base_store
1090 .put(&some_path, vec![0; 1000].into())
1091 .await
1092 .unwrap();
1093
1094 let semaphore = Arc::new(tokio::sync::Semaphore::new(0));
1095 let mut obj_store = MockObjectStore::default();
1096 let semaphore_copy = semaphore.clone();
1097 obj_store
1098 .expect_get_opts()
1099 .returning(move |location, options| {
1100 let semaphore = semaphore.clone();
1101 let base_store = base_store.clone();
1102 let location = location.clone();
1103 async move {
1104 semaphore.acquire().await.unwrap().forget();
1105 base_store.get_opts(&location, options).await
1106 }
1107 .boxed()
1108 });
1109 let obj_store = Arc::new(ObjectStore::new(
1110 Arc::new(obj_store),
1111 Url::parse("mem://").unwrap(),
1112 Some(500),
1113 None,
1114 false,
1115 false,
1116 1,
1117 DEFAULT_DOWNLOAD_RETRY_COUNT,
1118 ));
1119
1120 let config = SchedulerConfig {
1121 io_buffer_size_bytes: 1024 * 1024,
1122 };
1123
1124 let scan_scheduler = ScanScheduler::new(obj_store, config);
1125
1126 let file_scheduler = scan_scheduler
1127 .open_file(&Path::parse("foo").unwrap(), &CachedFileSize::new(1000))
1128 .await
1129 .unwrap();
1130
1131 let first_fut = timeout(
1135 Duration::from_secs(10),
1136 file_scheduler.submit_single(0..10, 0),
1137 )
1138 .boxed();
1139
1140 let mut second_fut = timeout(
1142 Duration::from_secs(10),
1143 file_scheduler.submit_single(0..20, 100),
1144 )
1145 .boxed();
1146
1147 let mut third_fut = timeout(
1150 Duration::from_secs(10),
1151 file_scheduler.submit_single(0..30, 0),
1152 )
1153 .boxed();
1154
1155 semaphore_copy.add_permits(1);
1157 assert!(first_fut.await.unwrap().unwrap().len() == 10);
1158 assert!(poll!(&mut second_fut).is_pending());
1160 assert!(poll!(&mut third_fut).is_pending());
1161
1162 semaphore_copy.add_permits(1);
1164 assert!(third_fut.await.unwrap().unwrap().len() == 30);
1165 assert!(poll!(&mut second_fut).is_pending());
1166
1167 semaphore_copy.add_permits(1);
1169 assert!(second_fut.await.unwrap().unwrap().len() == 20);
1170 }
1171
1172 #[tokio::test(flavor = "multi_thread")]
1173 async fn test_backpressure() {
1174 let some_path = Path::parse("foo").unwrap();
1175 let base_store = Arc::new(InMemory::new());
1176 base_store
1177 .put(&some_path, vec![0; 100000].into())
1178 .await
1179 .unwrap();
1180
1181 let bytes_read = Arc::new(AtomicU64::from(0));
1182 let mut obj_store = MockObjectStore::default();
1183 let bytes_read_copy = bytes_read.clone();
1184 obj_store
1186 .expect_get_opts()
1187 .returning(move |location, options| {
1188 let range = options.range.as_ref().unwrap();
1189 let num_bytes = match range {
1190 GetRange::Bounded(bounded) => bounded.end - bounded.start,
1191 _ => panic!(),
1192 };
1193 bytes_read_copy.fetch_add(num_bytes as u64, Ordering::Release);
1194 let location = location.clone();
1195 let base_store = base_store.clone();
1196 async move { base_store.get_opts(&location, options).await }.boxed()
1197 });
1198 let obj_store = Arc::new(ObjectStore::new(
1199 Arc::new(obj_store),
1200 Url::parse("mem://").unwrap(),
1201 Some(500),
1202 None,
1203 false,
1204 false,
1205 1,
1206 DEFAULT_DOWNLOAD_RETRY_COUNT,
1207 ));
1208
1209 let config = SchedulerConfig {
1210 io_buffer_size_bytes: 10,
1211 };
1212
1213 let scan_scheduler = ScanScheduler::new(obj_store.clone(), config);
1214
1215 let file_scheduler = scan_scheduler
1216 .open_file(&Path::parse("foo").unwrap(), &CachedFileSize::new(100000))
1217 .await
1218 .unwrap();
1219
1220 let wait_for_idle = || async move {
1221 let handle = Handle::current();
1222 while handle.metrics().num_alive_tasks() != 1 {
1223 tokio::time::sleep(Duration::from_millis(10)).await;
1224 }
1225 };
1226 let wait_for_bytes_read_and_idle = |target_bytes: u64| {
1227 let bytes_read = &bytes_read;
1229 async move {
1230 let bytes_read_copy = bytes_read.clone();
1231 while bytes_read_copy.load(Ordering::Acquire) < target_bytes {
1232 tokio::time::sleep(Duration::from_millis(10)).await;
1233 }
1234 wait_for_idle().await;
1235 }
1236 };
1237
1238 let first_fut = file_scheduler.submit_single(0..5, 0);
1240 let second_fut = file_scheduler.submit_single(0..5, 0);
1242 let third_fut = file_scheduler.submit_single(0..3, 0);
1244 wait_for_bytes_read_and_idle(10).await;
1246
1247 assert_eq!(first_fut.await.unwrap().len(), 5);
1248 wait_for_bytes_read_and_idle(13).await;
1250
1251 let fourth_fut = file_scheduler.submit_single(0..5, 0);
1253 wait_for_bytes_read_and_idle(13).await;
1254
1255 assert_eq!(third_fut.await.unwrap().len(), 3);
1257 wait_for_bytes_read_and_idle(18).await;
1258
1259 assert_eq!(second_fut.await.unwrap().len(), 5);
1260 let fifth_fut = file_scheduler.submit_request(vec![0..3, 90000..90007], 0);
1267 wait_for_bytes_read_and_idle(21).await;
1268
1269 let fifth_bytes = tokio::time::timeout(Duration::from_secs(10), fifth_fut)
1271 .await
1272 .unwrap();
1273 assert_eq!(
1274 fifth_bytes.unwrap().iter().map(|b| b.len()).sum::<usize>(),
1275 10
1276 );
1277
1278 assert_eq!(fourth_fut.await.unwrap().len(), 5);
1280 wait_for_bytes_read_and_idle(28).await;
1281
1282 let config = SchedulerConfig {
1284 io_buffer_size_bytes: 10,
1285 };
1286
1287 let scan_scheduler = ScanScheduler::new(obj_store, config);
1288 let file_scheduler = scan_scheduler
1289 .open_file(&Path::parse("foo").unwrap(), &CachedFileSize::new(100000))
1290 .await
1291 .unwrap();
1292
1293 let first_fut = file_scheduler.submit_single(0..10, 0);
1294 let second_fut = file_scheduler.submit_single(0..10, 0);
1295
1296 std::thread::sleep(Duration::from_millis(100));
1297 assert_eq!(first_fut.await.unwrap().len(), 10);
1298 assert_eq!(second_fut.await.unwrap().len(), 10);
1299 }
1300
1301 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1302 async fn stress_backpressure() {
1303 let some_path = Path::parse("foo").unwrap();
1307 let obj_store = Arc::new(ObjectStore::memory());
1308 obj_store
1309 .put(&some_path, vec![0; 100000].as_slice())
1310 .await
1311 .unwrap();
1312
1313 let config = SchedulerConfig {
1315 io_buffer_size_bytes: 1,
1316 };
1317 let scan_scheduler = ScanScheduler::new(obj_store.clone(), config);
1318 let file_scheduler = scan_scheduler
1319 .open_file(&some_path, &CachedFileSize::unknown())
1320 .await
1321 .unwrap();
1322
1323 let mut futs = Vec::with_capacity(10000);
1324 for idx in 0..10000 {
1325 futs.push(file_scheduler.submit_single(idx..idx + 1, idx));
1326 }
1327
1328 for fut in futs {
1329 fut.await.unwrap();
1330 }
1331 }
1332}