1use bytes::Bytes;
5use futures::channel::oneshot;
6use futures::{FutureExt, TryFutureExt};
7use object_store::path::Path;
8use snafu::location;
9use std::collections::BinaryHeap;
10use std::fmt::Debug;
11use std::future::Future;
12use std::num::NonZero;
13use std::ops::Range;
14use std::sync::atomic::{AtomicU64, Ordering};
15use std::sync::{Arc, Mutex};
16use std::time::Instant;
17use tokio::sync::{Notify, Semaphore, SemaphorePermit};
18
19use lance_core::{Error, Result};
20
21use crate::object_store::ObjectStore;
22use crate::traits::Reader;
23use crate::utils::CachedFileSize;
24
25const BACKPRESSURE_MIN: u64 = 5;
27const BACKPRESSURE_DEBOUNCE: u64 = 60;
29
30static IOPS_COUNTER: AtomicU64 = AtomicU64::new(0);
32static BYTES_READ_COUNTER: AtomicU64 = AtomicU64::new(0);
34static DEFAULT_PROCESS_IOPS_LIMIT: i32 = 128;
47
48pub fn iops_counter() -> u64 {
49 IOPS_COUNTER.load(Ordering::Acquire)
50}
51
52pub fn bytes_read_counter() -> u64 {
53 BYTES_READ_COUNTER.load(Ordering::Acquire)
54}
55
56struct IopsQuota {
83 iops_avail: Option<Semaphore>,
85}
86
87struct IopsReservation<'a> {
92 value: Option<SemaphorePermit<'a>>,
93}
94
95impl IopsReservation<'_> {
96 fn forget(&mut self) {
98 if let Some(value) = self.value.take() {
99 value.forget();
100 }
101 }
102}
103
104impl IopsQuota {
105 fn new() -> Self {
110 let initial_capacity = std::env::var("LANCE_PROCESS_IO_THREADS_LIMIT")
111 .map(|s| {
112 s.parse::<i32>().unwrap_or_else(|_| {
113 log::warn!("Ignoring invalid LANCE_PROCESS_IO_THREADS_LIMIT: {}", s);
114 DEFAULT_PROCESS_IOPS_LIMIT
115 })
116 })
117 .unwrap_or(DEFAULT_PROCESS_IOPS_LIMIT);
118 let iops_avail = if initial_capacity <= 0 {
119 None
120 } else {
121 Some(Semaphore::new(initial_capacity as usize))
122 };
123 Self { iops_avail }
124 }
125
126 fn release(&self) {
128 if let Some(iops_avail) = self.iops_avail.as_ref() {
129 iops_avail.add_permits(1);
130 }
131 }
132
133 async fn acquire(&self) -> IopsReservation<'_> {
135 if let Some(iops_avail) = self.iops_avail.as_ref() {
136 IopsReservation {
137 value: Some(iops_avail.acquire().await.unwrap()),
138 }
139 } else {
140 IopsReservation { value: None }
141 }
142 }
143}
144
145static IOPS_QUOTA: std::sync::LazyLock<IopsQuota> = std::sync::LazyLock::new(IopsQuota::new);
146
147struct PrioritiesInFlight {
156 in_flight: Vec<u128>,
157}
158
159impl PrioritiesInFlight {
160 fn new(capacity: u32) -> Self {
161 Self {
162 in_flight: Vec::with_capacity(capacity as usize * 2),
163 }
164 }
165
166 fn min_in_flight(&self) -> u128 {
167 self.in_flight.first().copied().unwrap_or(u128::MAX)
168 }
169
170 fn push(&mut self, prio: u128) {
171 let pos = match self.in_flight.binary_search(&prio) {
172 Ok(pos) => pos,
173 Err(pos) => pos,
174 };
175 self.in_flight.insert(pos, prio);
176 }
177
178 fn remove(&mut self, prio: u128) {
179 if let Ok(pos) = self.in_flight.binary_search(&prio) {
180 self.in_flight.remove(pos);
181 }
182 }
183}
184
185struct IoQueueState {
186 iops_avail: u32,
188 bytes_avail: i64,
192 pending_requests: BinaryHeap<IoTask>,
194 priorities_in_flight: PrioritiesInFlight,
196 done_scheduling: bool,
199 start: Instant,
201 last_warn: AtomicU64,
203}
204
205impl IoQueueState {
206 fn new(io_capacity: u32, io_buffer_size: u64) -> Self {
207 Self {
208 iops_avail: io_capacity,
209 bytes_avail: io_buffer_size as i64,
210 pending_requests: BinaryHeap::new(),
211 priorities_in_flight: PrioritiesInFlight::new(io_capacity),
212 done_scheduling: false,
213 start: Instant::now(),
214 last_warn: AtomicU64::from(0),
215 }
216 }
217
218 fn warn_if_needed(&self) {
219 let seconds_elapsed = self.start.elapsed().as_secs();
220 let last_warn = self.last_warn.load(Ordering::Acquire);
221 let since_last_warn = seconds_elapsed - last_warn;
222 if (last_warn == 0
223 && seconds_elapsed > BACKPRESSURE_MIN
224 && seconds_elapsed < BACKPRESSURE_DEBOUNCE)
225 || since_last_warn > BACKPRESSURE_DEBOUNCE
226 {
227 tracing::event!(tracing::Level::DEBUG, "Backpressure throttle exceeded");
228 log::debug!("Backpressure throttle is full, I/O will pause until buffer is drained. Max I/O bandwidth will not be achieved because CPU is falling behind");
229 self.last_warn
230 .store(seconds_elapsed.max(1), Ordering::Release);
231 }
232 }
233
234 fn can_deliver(&self, task: &IoTask) -> bool {
235 if self.iops_avail == 0 {
236 false
237 } else if task.priority <= self.priorities_in_flight.min_in_flight() {
238 true
239 } else if task.num_bytes() as i64 > self.bytes_avail {
240 self.warn_if_needed();
241 false
242 } else {
243 true
244 }
245 }
246
247 fn next_task(&mut self) -> Option<IoTask> {
248 let task = self.pending_requests.peek()?;
249 if self.can_deliver(task) {
250 self.priorities_in_flight.push(task.priority);
251 self.iops_avail -= 1;
252 self.bytes_avail -= task.num_bytes() as i64;
253 if self.bytes_avail < 0 {
254 log::debug!(
256 "Backpressure throttle temporarily exceeded by {} bytes due to priority I/O",
257 -self.bytes_avail
258 );
259 }
260 Some(self.pending_requests.pop().unwrap())
261 } else {
262 None
263 }
264 }
265}
266
267struct IoQueue {
272 state: Mutex<IoQueueState>,
274 notify: Notify,
276}
277
278impl IoQueue {
279 fn new(io_capacity: u32, io_buffer_size: u64) -> Self {
280 Self {
281 state: Mutex::new(IoQueueState::new(io_capacity, io_buffer_size)),
282 notify: Notify::new(),
283 }
284 }
285
286 fn push(&self, task: IoTask) {
287 log::trace!(
288 "Inserting I/O request for {} bytes with priority ({},{}) into I/O queue",
289 task.num_bytes(),
290 task.priority >> 64,
291 task.priority & 0xFFFFFFFFFFFFFFFF
292 );
293 let mut state = self.state.lock().unwrap();
294 state.pending_requests.push(task);
295 drop(state);
296
297 self.notify.notify_one();
298 }
299
300 async fn pop(&self) -> Option<IoTask> {
301 loop {
302 {
303 let mut iop_res = IOPS_QUOTA.acquire().await;
308 let mut state = self.state.lock().unwrap();
310 if let Some(task) = state.next_task() {
311 iop_res.forget();
314 return Some(task);
315 }
316
317 if state.done_scheduling {
318 return None;
319 }
320 }
321
322 self.notify.notified().await;
323 }
324 }
325
326 fn on_iop_complete(&self) {
327 let mut state = self.state.lock().unwrap();
328 state.iops_avail += 1;
329 drop(state);
330
331 self.notify.notify_one();
332 }
333
334 fn on_bytes_consumed(&self, bytes: u64, priority: u128, num_reqs: usize) {
335 let mut state = self.state.lock().unwrap();
336 state.bytes_avail += bytes as i64;
337 for _ in 0..num_reqs {
338 state.priorities_in_flight.remove(priority);
339 }
340 drop(state);
341
342 self.notify.notify_one();
343 }
344
345 fn close(&self) {
346 let mut state = self.state.lock().unwrap();
347 state.done_scheduling = true;
348 let pending_requests = std::mem::take(&mut state.pending_requests);
349 drop(state);
350 for request in pending_requests {
351 request.cancel();
352 }
353
354 self.notify.notify_one();
355 }
356}
357
358struct MutableBatch<F: FnOnce(Response) + Send> {
363 when_done: Option<F>,
364 data_buffers: Vec<Bytes>,
365 num_bytes: u64,
366 priority: u128,
367 num_reqs: usize,
368 err: Option<Box<dyn std::error::Error + Send + Sync + 'static>>,
369}
370
371impl<F: FnOnce(Response) + Send> MutableBatch<F> {
372 fn new(when_done: F, num_data_buffers: u32, priority: u128, num_reqs: usize) -> Self {
373 Self {
374 when_done: Some(when_done),
375 data_buffers: vec![Bytes::default(); num_data_buffers as usize],
376 num_bytes: 0,
377 priority,
378 num_reqs,
379 err: None,
380 }
381 }
382}
383
384impl<F: FnOnce(Response) + Send> Drop for MutableBatch<F> {
389 fn drop(&mut self) {
390 let result = if self.err.is_some() {
392 Err(Error::Wrapped {
393 error: self.err.take().unwrap(),
394 location: location!(),
395 })
396 } else {
397 let mut data = Vec::new();
398 std::mem::swap(&mut data, &mut self.data_buffers);
399 Ok(data)
400 };
401 let response = Response {
404 data: result,
405 num_bytes: self.num_bytes,
406 priority: self.priority,
407 num_reqs: self.num_reqs,
408 };
409 (self.when_done.take().unwrap())(response);
410 }
411}
412
413struct DataChunk {
414 task_idx: usize,
415 num_bytes: u64,
416 data: Result<Bytes>,
417}
418
419trait DataSink: Send {
420 fn deliver_data(&mut self, data: DataChunk);
421}
422
423impl<F: FnOnce(Response) + Send> DataSink for MutableBatch<F> {
424 fn deliver_data(&mut self, data: DataChunk) {
426 self.num_bytes += data.num_bytes;
427 match data.data {
428 Ok(data_bytes) => {
429 self.data_buffers[data.task_idx] = data_bytes;
430 }
431 Err(err) => {
432 self.err.get_or_insert(Box::new(err));
434 }
435 }
436 }
437}
438
439struct IoTask {
440 reader: Arc<dyn Reader>,
441 to_read: Range<u64>,
442 when_done: Box<dyn FnOnce(Result<Bytes>) + Send>,
443 priority: u128,
444}
445
446impl Eq for IoTask {}
447
448impl PartialEq for IoTask {
449 fn eq(&self, other: &Self) -> bool {
450 self.priority == other.priority
451 }
452}
453
454impl PartialOrd for IoTask {
455 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
456 Some(self.cmp(other))
457 }
458}
459
460impl Ord for IoTask {
461 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
462 other.priority.cmp(&self.priority)
464 }
465}
466
467impl IoTask {
468 fn num_bytes(&self) -> u64 {
469 self.to_read.end - self.to_read.start
470 }
471 fn cancel(self) {
472 (self.when_done)(Err(Error::Internal {
473 message: "Scheduler closed before I/O was completed".to_string(),
474 location: location!(),
475 }));
476 }
477
478 async fn run(self) {
479 let file_path = self.reader.path().as_ref();
480 let num_bytes = self.num_bytes();
481 let bytes = if self.to_read.start == self.to_read.end {
482 Ok(Bytes::new())
483 } else {
484 let bytes_fut = self
485 .reader
486 .get_range(self.to_read.start as usize..self.to_read.end as usize);
487 IOPS_COUNTER.fetch_add(1, Ordering::Release);
488 let num_bytes = self.num_bytes();
489 bytes_fut
490 .inspect(move |_| {
491 BYTES_READ_COUNTER.fetch_add(num_bytes, Ordering::Release);
492 })
493 .await
494 .map_err(Error::from)
495 };
496 tracing::trace!(
498 file = file_path,
499 bytes_read = num_bytes,
500 requests = 1,
501 range_start = self.to_read.start,
502 range_end = self.to_read.end,
503 "File I/O completed"
504 );
505 IOPS_QUOTA.release();
506 (self.when_done)(bytes);
507 }
508}
509
510async fn run_io_loop(tasks: Arc<IoQueue>) {
513 loop {
516 let next_task = tasks.pop().await;
517 match next_task {
518 Some(task) => {
519 tokio::spawn(task.run());
520 }
521 None => {
522 return;
524 }
525 }
526 }
527}
528
529#[derive(Debug)]
530struct StatsCollector {
531 iops: AtomicU64,
532 requests: AtomicU64,
533 bytes_read: AtomicU64,
534}
535
536impl StatsCollector {
537 fn new() -> Self {
538 Self {
539 iops: AtomicU64::new(0),
540 requests: AtomicU64::new(0),
541 bytes_read: AtomicU64::new(0),
542 }
543 }
544
545 fn iops(&self) -> u64 {
546 self.iops.load(Ordering::Relaxed)
547 }
548
549 fn bytes_read(&self) -> u64 {
550 self.bytes_read.load(Ordering::Relaxed)
551 }
552
553 fn requests(&self) -> u64 {
554 self.requests.load(Ordering::Relaxed)
555 }
556
557 fn record_request(&self, request: &[Range<u64>]) {
558 self.requests.fetch_add(1, Ordering::Relaxed);
559 self.iops.fetch_add(request.len() as u64, Ordering::Relaxed);
560 self.bytes_read.fetch_add(
561 request.iter().map(|r| r.end - r.start).sum::<u64>(),
562 Ordering::Relaxed,
563 );
564 }
565}
566
567pub struct ScanStats {
568 pub iops: u64,
569 pub requests: u64,
570 pub bytes_read: u64,
571}
572
573impl ScanStats {
574 fn new(stats: &StatsCollector) -> Self {
575 Self {
576 iops: stats.iops(),
577 requests: stats.requests(),
578 bytes_read: stats.bytes_read(),
579 }
580 }
581}
582
583pub struct ScanScheduler {
592 object_store: Arc<ObjectStore>,
593 io_queue: Arc<IoQueue>,
594 stats: Arc<StatsCollector>,
595}
596
597impl Debug for ScanScheduler {
598 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
599 f.debug_struct("ScanScheduler")
600 .field("object_store", &self.object_store)
601 .finish()
602 }
603}
604
605struct Response {
606 data: Result<Vec<Bytes>>,
607 priority: u128,
608 num_reqs: usize,
609 num_bytes: u64,
610}
611
612#[derive(Debug, Clone, Copy)]
613pub struct SchedulerConfig {
614 pub io_buffer_size_bytes: u64,
618}
619
620impl SchedulerConfig {
621 pub fn default_for_testing() -> Self {
623 Self {
624 io_buffer_size_bytes: 256 * 1024 * 1024,
625 }
626 }
627
628 pub fn max_bandwidth(store: &ObjectStore) -> Self {
631 Self {
632 io_buffer_size_bytes: 32 * 1024 * 1024 * store.io_parallelism() as u64,
633 }
634 }
635}
636
637impl ScanScheduler {
638 pub fn new(object_store: Arc<ObjectStore>, config: SchedulerConfig) -> Arc<Self> {
645 let io_capacity = object_store.io_parallelism();
646 let io_queue = Arc::new(IoQueue::new(
647 io_capacity as u32,
648 config.io_buffer_size_bytes,
649 ));
650 let slf = Arc::new(Self {
651 object_store,
652 io_queue: io_queue.clone(),
653 stats: Arc::new(StatsCollector::new()),
654 });
655 tokio::task::spawn(async move { run_io_loop(io_queue).await });
659 slf
660 }
661
662 pub async fn open_file_with_priority(
671 self: &Arc<Self>,
672 path: &Path,
673 base_priority: u64,
674 file_size_bytes: &CachedFileSize,
675 ) -> Result<FileScheduler> {
676 let file_size_bytes = if let Some(size) = file_size_bytes.get() {
677 u64::from(size)
678 } else {
679 let size = self.object_store.size(path).await?;
680 if let Some(size) = NonZero::new(size) {
681 file_size_bytes.set(size);
682 }
683 size
684 };
685 let reader = self
686 .object_store
687 .open_with_size(path, file_size_bytes as usize)
688 .await?;
689 let block_size = self.object_store.block_size() as u64;
690 let max_iop_size = self.object_store.max_iop_size();
691 Ok(FileScheduler {
692 reader: reader.into(),
693 block_size,
694 root: self.clone(),
695 base_priority,
696 max_iop_size,
697 })
698 }
699
700 pub async fn open_file(
704 self: &Arc<Self>,
705 path: &Path,
706 file_size_bytes: &CachedFileSize,
707 ) -> Result<FileScheduler> {
708 self.open_file_with_priority(path, 0, file_size_bytes).await
709 }
710
711 fn do_submit_request(
712 &self,
713 reader: Arc<dyn Reader>,
714 request: Vec<Range<u64>>,
715 tx: oneshot::Sender<Response>,
716 priority: u128,
717 ) {
718 let num_iops = request.len() as u32;
719
720 let when_all_io_done = move |bytes_and_permits| {
721 let _ = tx.send(bytes_and_permits);
723 };
724
725 let dest = Arc::new(Mutex::new(Box::new(MutableBatch::new(
726 when_all_io_done,
727 num_iops,
728 priority,
729 request.len(),
730 ))));
731
732 for (task_idx, iop) in request.into_iter().enumerate() {
733 let dest = dest.clone();
734 let io_queue = self.io_queue.clone();
735 let num_bytes = iop.end - iop.start;
736 let task = IoTask {
737 reader: reader.clone(),
738 to_read: iop,
739 priority,
740 when_done: Box::new(move |data| {
741 io_queue.on_iop_complete();
742 let mut dest = dest.lock().unwrap();
743 let chunk = DataChunk {
744 data,
745 task_idx,
746 num_bytes,
747 };
748 dest.deliver_data(chunk);
749 }),
750 };
751 self.io_queue.push(task);
752 }
753 }
754
755 fn submit_request(
756 &self,
757 reader: Arc<dyn Reader>,
758 request: Vec<Range<u64>>,
759 priority: u128,
760 ) -> impl Future<Output = Result<Vec<Bytes>>> + Send {
761 let (tx, rx) = oneshot::channel::<Response>();
762
763 self.do_submit_request(reader, request, tx, priority);
764
765 let io_queue = self.io_queue.clone();
766
767 rx.map(move |wrapped_rsp| {
768 let rsp = wrapped_rsp.unwrap();
771 io_queue.on_bytes_consumed(rsp.num_bytes, rsp.priority, rsp.num_reqs);
772 rsp.data
773 })
774 }
775
776 pub fn stats(&self) -> ScanStats {
777 ScanStats::new(self.stats.as_ref())
778 }
779}
780
781impl Drop for ScanScheduler {
782 fn drop(&mut self) {
783 self.io_queue.close();
795 }
796}
797
798#[derive(Clone, Debug)]
800pub struct FileScheduler {
801 reader: Arc<dyn Reader>,
802 root: Arc<ScanScheduler>,
803 block_size: u64,
804 base_priority: u64,
805 max_iop_size: u64,
806}
807
808fn is_close_together(range1: &Range<u64>, range2: &Range<u64>, block_size: u64) -> bool {
809 range2.start <= (range1.end + block_size)
811}
812
813fn is_overlapping(range1: &Range<u64>, range2: &Range<u64>) -> bool {
814 range1.start < range2.end && range2.start < range1.end
815}
816
817impl FileScheduler {
818 pub fn submit_request(
831 &self,
832 request: Vec<Range<u64>>,
833 priority: u64,
834 ) -> impl Future<Output = Result<Vec<Bytes>>> + Send {
835 let priority = ((self.base_priority as u128) << 64) + priority as u128;
837
838 let mut merged_requests = Vec::with_capacity(request.len());
839
840 if !request.is_empty() {
841 let mut curr_interval = request[0].clone();
842
843 for req in request.iter().skip(1) {
844 if is_close_together(&curr_interval, req, self.block_size) {
845 curr_interval.end = curr_interval.end.max(req.end);
846 } else {
847 merged_requests.push(curr_interval);
848 curr_interval = req.clone();
849 }
850 }
851
852 merged_requests.push(curr_interval);
853 }
854
855 let mut updated_requests = Vec::with_capacity(merged_requests.len());
856 for req in merged_requests {
857 if req.is_empty() {
858 updated_requests.push(req);
859 } else {
860 let num_requests = (req.end - req.start).div_ceil(self.max_iop_size);
861 let bytes_per_request = (req.end - req.start) / num_requests;
862 for i in 0..num_requests {
863 let start = req.start + i * bytes_per_request;
864 let end = if i == num_requests - 1 {
865 req.end
867 } else {
868 start + bytes_per_request
869 };
870 updated_requests.push(start..end);
871 }
872 }
873 }
874
875 self.root.stats.record_request(&updated_requests);
876
877 let bytes_vec_fut =
878 self.root
879 .submit_request(self.reader.clone(), updated_requests.clone(), priority);
880
881 let mut updated_index = 0;
882 let mut final_bytes = Vec::with_capacity(request.len());
883
884 async move {
885 let bytes_vec = bytes_vec_fut.await?;
886
887 let mut orig_index = 0;
888 while (updated_index < updated_requests.len()) && (orig_index < request.len()) {
889 let updated_range = &updated_requests[updated_index];
890 let orig_range = &request[orig_index];
891 let byte_offset = updated_range.start as usize;
892
893 if is_overlapping(updated_range, orig_range) {
894 let start = orig_range.start as usize - byte_offset;
896 if orig_range.end <= updated_range.end {
897 let end = orig_range.end as usize - byte_offset;
900 final_bytes.push(bytes_vec[updated_index].slice(start..end));
901 } else {
902 let orig_size = orig_range.end - orig_range.start;
905 let mut merged_bytes = Vec::with_capacity(orig_size as usize);
906 merged_bytes.extend_from_slice(&bytes_vec[updated_index].slice(start..));
907 let mut copy_offset = merged_bytes.len() as u64;
908 while copy_offset < orig_size {
909 updated_index += 1;
910 let next_range = &updated_requests[updated_index];
911 let bytes_to_take =
912 (orig_size - copy_offset).min(next_range.end - next_range.start);
913 merged_bytes.extend_from_slice(
914 &bytes_vec[updated_index].slice(0..bytes_to_take as usize),
915 );
916 copy_offset += bytes_to_take;
917 }
918 final_bytes.push(Bytes::from(merged_bytes));
919 }
920 orig_index += 1;
921 } else {
922 updated_index += 1;
923 }
924 }
925
926 Ok(final_bytes)
927 }
928 }
929
930 pub fn with_priority(&self, priority: u64) -> Self {
931 Self {
932 reader: self.reader.clone(),
933 root: self.root.clone(),
934 block_size: self.block_size,
935 max_iop_size: self.max_iop_size,
936 base_priority: priority,
937 }
938 }
939
940 pub fn submit_single(
947 &self,
948 range: Range<u64>,
949 priority: u64,
950 ) -> impl Future<Output = Result<Bytes>> + Send {
951 self.submit_request(vec![range], priority)
952 .map_ok(|vec_bytes| vec_bytes.into_iter().next().unwrap())
953 }
954
955 pub fn reader(&self) -> &Arc<dyn Reader> {
961 &self.reader
962 }
963}
964
965#[cfg(test)]
966mod tests {
967 use std::{collections::VecDeque, time::Duration};
968
969 use futures::poll;
970 use lance_core::utils::tempfile::TempObjFile;
971 use rand::RngCore;
972
973 use object_store::{memory::InMemory, GetRange, ObjectStore as OSObjectStore};
974 use tokio::{runtime::Handle, time::timeout};
975 use url::Url;
976
977 use crate::{
978 object_store::{DEFAULT_DOWNLOAD_RETRY_COUNT, DEFAULT_MAX_IOP_SIZE},
979 testing::MockObjectStore,
980 };
981
982 use super::*;
983
984 #[tokio::test]
985 async fn test_full_seq_read() {
986 let tmp_file = TempObjFile::default();
987
988 let obj_store = Arc::new(ObjectStore::local());
989
990 const DATA_SIZE: u64 = 1024 * 1024;
992 let mut some_data = vec![0; DATA_SIZE as usize];
993 rand::rng().fill_bytes(&mut some_data);
994 obj_store.put(&tmp_file, &some_data).await.unwrap();
995
996 let config = SchedulerConfig::default_for_testing();
997
998 let scheduler = ScanScheduler::new(obj_store, config);
999
1000 let file_scheduler = scheduler
1001 .open_file(&tmp_file, &CachedFileSize::unknown())
1002 .await
1003 .unwrap();
1004
1005 const READ_SIZE: u64 = 4 * 1024;
1007 let mut reqs = VecDeque::new();
1008 let mut offset = 0;
1009 while offset < DATA_SIZE {
1010 reqs.push_back(
1011 #[allow(clippy::single_range_in_vec_init)]
1012 file_scheduler
1013 .submit_request(vec![offset..offset + READ_SIZE], 0)
1014 .await
1015 .unwrap(),
1016 );
1017 offset += READ_SIZE;
1018 }
1019
1020 offset = 0;
1021 while offset < DATA_SIZE {
1023 let data = reqs.pop_front().unwrap();
1024 let actual = &data[0];
1025 let expected = &some_data[offset as usize..(offset + READ_SIZE) as usize];
1026 assert_eq!(expected, actual);
1027 offset += READ_SIZE;
1028 }
1029 }
1030
1031 #[tokio::test]
1032 async fn test_split_coalesce() {
1033 let tmp_file = TempObjFile::default();
1034
1035 let obj_store = Arc::new(ObjectStore::local());
1036
1037 const DATA_SIZE: u64 = 75 * 1024 * 1024;
1039 let mut some_data = vec![0; DATA_SIZE as usize];
1040 rand::rng().fill_bytes(&mut some_data);
1041 obj_store.put(&tmp_file, &some_data).await.unwrap();
1042
1043 let config = SchedulerConfig::default_for_testing();
1044
1045 let scheduler = ScanScheduler::new(obj_store, config);
1046
1047 let file_scheduler = scheduler
1048 .open_file(&tmp_file, &CachedFileSize::unknown())
1049 .await
1050 .unwrap();
1051
1052 let req =
1055 file_scheduler.submit_request(vec![50_000..51_000, 52_000..53_000, 54_000..55_000], 0);
1056
1057 let bytes = req.await.unwrap();
1058
1059 assert_eq!(bytes[0], &some_data[50_000..51_000]);
1060 assert_eq!(bytes[1], &some_data[52_000..53_000]);
1061 assert_eq!(bytes[2], &some_data[54_000..55_000]);
1062
1063 assert_eq!(1, scheduler.stats().iops);
1064
1065 let req = file_scheduler.submit_request(vec![0..DATA_SIZE], 0);
1067 let bytes = req.await.unwrap();
1068 assert!(bytes[0] == some_data, "data is not the same");
1069
1070 assert_eq!(6, scheduler.stats().iops);
1071
1072 let chunk_size = *DEFAULT_MAX_IOP_SIZE;
1076 let req = file_scheduler.submit_request(
1077 vec![
1078 10..chunk_size,
1079 chunk_size + 10..(chunk_size * 2) - 20,
1080 chunk_size * 2..(chunk_size * 2) + 10,
1081 ],
1082 0,
1083 );
1084
1085 let bytes = req.await.unwrap();
1086 let chunk_size = chunk_size as usize;
1087 assert!(
1088 bytes[0] == some_data[10..chunk_size],
1089 "data is not the same"
1090 );
1091 assert!(
1092 bytes[1] == some_data[chunk_size + 10..(chunk_size * 2) - 20],
1093 "data is not the same"
1094 );
1095 assert!(
1096 bytes[2] == some_data[chunk_size * 2..(chunk_size * 2) + 10],
1097 "data is not the same"
1098 );
1099 assert_eq!(8, scheduler.stats().iops);
1100
1101 let reads = (0..44)
1102 .map(|i| i * 1_000_000..(i + 1) * 1_000_000)
1103 .collect::<Vec<_>>();
1104 let req = file_scheduler.submit_request(reads, 0);
1105 let bytes = req.await.unwrap();
1106 for (i, bytes) in bytes.iter().enumerate() {
1107 assert!(
1108 bytes == &some_data[i * 1_000_000..(i + 1) * 1_000_000],
1109 "data is not the same"
1110 );
1111 }
1112 assert_eq!(11, scheduler.stats().iops);
1113 }
1114
1115 #[tokio::test]
1116 async fn test_priority() {
1117 let some_path = Path::parse("foo").unwrap();
1118 let base_store = Arc::new(InMemory::new());
1119 base_store
1120 .put(&some_path, vec![0; 1000].into())
1121 .await
1122 .unwrap();
1123
1124 let semaphore = Arc::new(tokio::sync::Semaphore::new(0));
1125 let mut obj_store = MockObjectStore::default();
1126 let semaphore_copy = semaphore.clone();
1127 obj_store
1128 .expect_get_opts()
1129 .returning(move |location, options| {
1130 let semaphore = semaphore.clone();
1131 let base_store = base_store.clone();
1132 let location = location.clone();
1133 async move {
1134 semaphore.acquire().await.unwrap().forget();
1135 base_store.get_opts(&location, options).await
1136 }
1137 .boxed()
1138 });
1139 let obj_store = Arc::new(ObjectStore::new(
1140 Arc::new(obj_store),
1141 Url::parse("mem://").unwrap(),
1142 Some(500),
1143 None,
1144 false,
1145 false,
1146 1,
1147 DEFAULT_DOWNLOAD_RETRY_COUNT,
1148 None,
1149 ));
1150
1151 let config = SchedulerConfig {
1152 io_buffer_size_bytes: 1024 * 1024,
1153 };
1154
1155 let scan_scheduler = ScanScheduler::new(obj_store, config);
1156
1157 let file_scheduler = scan_scheduler
1158 .open_file(&Path::parse("foo").unwrap(), &CachedFileSize::new(1000))
1159 .await
1160 .unwrap();
1161
1162 let first_fut = timeout(
1166 Duration::from_secs(10),
1167 file_scheduler.submit_single(0..10, 0),
1168 )
1169 .boxed();
1170
1171 let mut second_fut = timeout(
1173 Duration::from_secs(10),
1174 file_scheduler.submit_single(0..20, 100),
1175 )
1176 .boxed();
1177
1178 let mut third_fut = timeout(
1181 Duration::from_secs(10),
1182 file_scheduler.submit_single(0..30, 0),
1183 )
1184 .boxed();
1185
1186 semaphore_copy.add_permits(1);
1188 assert!(first_fut.await.unwrap().unwrap().len() == 10);
1189 assert!(poll!(&mut second_fut).is_pending());
1191 assert!(poll!(&mut third_fut).is_pending());
1192
1193 semaphore_copy.add_permits(1);
1195 assert!(third_fut.await.unwrap().unwrap().len() == 30);
1196 assert!(poll!(&mut second_fut).is_pending());
1197
1198 semaphore_copy.add_permits(1);
1200 assert!(second_fut.await.unwrap().unwrap().len() == 20);
1201 }
1202
1203 #[tokio::test(flavor = "multi_thread")]
1204 async fn test_backpressure() {
1205 let some_path = Path::parse("foo").unwrap();
1206 let base_store = Arc::new(InMemory::new());
1207 base_store
1208 .put(&some_path, vec![0; 100000].into())
1209 .await
1210 .unwrap();
1211
1212 let bytes_read = Arc::new(AtomicU64::from(0));
1213 let mut obj_store = MockObjectStore::default();
1214 let bytes_read_copy = bytes_read.clone();
1215 obj_store
1217 .expect_get_opts()
1218 .returning(move |location, options| {
1219 let range = options.range.as_ref().unwrap();
1220 let num_bytes = match range {
1221 GetRange::Bounded(bounded) => bounded.end - bounded.start,
1222 _ => panic!(),
1223 };
1224 bytes_read_copy.fetch_add(num_bytes, Ordering::Release);
1225 let location = location.clone();
1226 let base_store = base_store.clone();
1227 async move { base_store.get_opts(&location, options).await }.boxed()
1228 });
1229 let obj_store = Arc::new(ObjectStore::new(
1230 Arc::new(obj_store),
1231 Url::parse("mem://").unwrap(),
1232 Some(500),
1233 None,
1234 false,
1235 false,
1236 1,
1237 DEFAULT_DOWNLOAD_RETRY_COUNT,
1238 None,
1239 ));
1240
1241 let config = SchedulerConfig {
1242 io_buffer_size_bytes: 10,
1243 };
1244
1245 let scan_scheduler = ScanScheduler::new(obj_store.clone(), config);
1246
1247 let file_scheduler = scan_scheduler
1248 .open_file(&Path::parse("foo").unwrap(), &CachedFileSize::new(100000))
1249 .await
1250 .unwrap();
1251
1252 let wait_for_idle = || async move {
1253 let handle = Handle::current();
1254 while handle.metrics().num_alive_tasks() != 1 {
1255 tokio::time::sleep(Duration::from_millis(10)).await;
1256 }
1257 };
1258 let wait_for_bytes_read_and_idle = |target_bytes: u64| {
1259 let bytes_read = &bytes_read;
1261 async move {
1262 let bytes_read_copy = bytes_read.clone();
1263 while bytes_read_copy.load(Ordering::Acquire) < target_bytes {
1264 tokio::time::sleep(Duration::from_millis(10)).await;
1265 }
1266 wait_for_idle().await;
1267 }
1268 };
1269
1270 let first_fut = file_scheduler.submit_single(0..5, 0);
1272 let second_fut = file_scheduler.submit_single(0..5, 0);
1274 let third_fut = file_scheduler.submit_single(0..3, 0);
1276 wait_for_bytes_read_and_idle(10).await;
1278
1279 assert_eq!(first_fut.await.unwrap().len(), 5);
1280 wait_for_bytes_read_and_idle(13).await;
1282
1283 let fourth_fut = file_scheduler.submit_single(0..5, 0);
1285 wait_for_bytes_read_and_idle(13).await;
1286
1287 assert_eq!(third_fut.await.unwrap().len(), 3);
1289 wait_for_bytes_read_and_idle(18).await;
1290
1291 assert_eq!(second_fut.await.unwrap().len(), 5);
1292 let fifth_fut = file_scheduler.submit_request(vec![0..3, 90000..90007], 0);
1299 wait_for_bytes_read_and_idle(21).await;
1300
1301 let fifth_bytes = tokio::time::timeout(Duration::from_secs(10), fifth_fut)
1303 .await
1304 .unwrap();
1305 assert_eq!(
1306 fifth_bytes.unwrap().iter().map(|b| b.len()).sum::<usize>(),
1307 10
1308 );
1309
1310 assert_eq!(fourth_fut.await.unwrap().len(), 5);
1312 wait_for_bytes_read_and_idle(28).await;
1313
1314 let config = SchedulerConfig {
1316 io_buffer_size_bytes: 10,
1317 };
1318
1319 let scan_scheduler = ScanScheduler::new(obj_store, config);
1320 let file_scheduler = scan_scheduler
1321 .open_file(&Path::parse("foo").unwrap(), &CachedFileSize::new(100000))
1322 .await
1323 .unwrap();
1324
1325 let first_fut = file_scheduler.submit_single(0..10, 0);
1326 let second_fut = file_scheduler.submit_single(0..10, 0);
1327
1328 std::thread::sleep(Duration::from_millis(100));
1329 assert_eq!(first_fut.await.unwrap().len(), 10);
1330 assert_eq!(second_fut.await.unwrap().len(), 10);
1331 }
1332
1333 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1334 async fn stress_backpressure() {
1335 let some_path = Path::parse("foo").unwrap();
1339 let obj_store = Arc::new(ObjectStore::memory());
1340 obj_store
1341 .put(&some_path, vec![0; 100000].as_slice())
1342 .await
1343 .unwrap();
1344
1345 let config = SchedulerConfig {
1347 io_buffer_size_bytes: 1,
1348 };
1349 let scan_scheduler = ScanScheduler::new(obj_store.clone(), config);
1350 let file_scheduler = scan_scheduler
1351 .open_file(&some_path, &CachedFileSize::unknown())
1352 .await
1353 .unwrap();
1354
1355 let mut futs = Vec::with_capacity(10000);
1356 for idx in 0..10000 {
1357 futs.push(file_scheduler.submit_single(idx..idx + 1, idx));
1358 }
1359
1360 for fut in futs {
1361 fut.await.unwrap();
1362 }
1363 }
1364}