1use bytes::Bytes;
5use futures::channel::oneshot;
6use futures::{FutureExt, TryFutureExt};
7use object_store::path::Path;
8use std::collections::BinaryHeap;
9use std::fmt::Debug;
10use std::future::Future;
11use std::num::NonZero;
12use std::ops::Range;
13use std::sync::atomic::{AtomicU64, Ordering};
14use std::sync::{Arc, Mutex};
15use std::time::Instant;
16use tokio::sync::{Notify, Semaphore, SemaphorePermit};
17
18use lance_core::{Error, Result};
19
20use crate::object_store::ObjectStore;
21use crate::traits::Reader;
22use crate::utils::CachedFileSize;
23
24mod lite;
25
26const BACKPRESSURE_MIN: u64 = 5;
28const BACKPRESSURE_DEBOUNCE: u64 = 60;
30
31static IOPS_COUNTER: AtomicU64 = AtomicU64::new(0);
33static BYTES_READ_COUNTER: AtomicU64 = AtomicU64::new(0);
35static DEFAULT_PROCESS_IOPS_LIMIT: i32 = 128;
48
49pub fn iops_counter() -> u64 {
50 IOPS_COUNTER.load(Ordering::Acquire)
51}
52
53pub fn bytes_read_counter() -> u64 {
54 BYTES_READ_COUNTER.load(Ordering::Acquire)
55}
56
57struct IopsQuota {
84 iops_avail: Option<Semaphore>,
86}
87
88struct IopsReservation<'a> {
93 value: Option<SemaphorePermit<'a>>,
94}
95
96impl IopsReservation<'_> {
97 fn forget(&mut self) {
99 if let Some(value) = self.value.take() {
100 value.forget();
101 }
102 }
103}
104
105impl IopsQuota {
106 fn new() -> Self {
111 let initial_capacity = std::env::var("LANCE_PROCESS_IO_THREADS_LIMIT")
112 .map(|s| {
113 s.parse::<i32>().unwrap_or_else(|_| {
114 log::warn!("Ignoring invalid LANCE_PROCESS_IO_THREADS_LIMIT: {}", s);
115 DEFAULT_PROCESS_IOPS_LIMIT
116 })
117 })
118 .unwrap_or(DEFAULT_PROCESS_IOPS_LIMIT);
119 let iops_avail = if initial_capacity <= 0 {
120 None
121 } else {
122 Some(Semaphore::new(initial_capacity as usize))
123 };
124 Self { iops_avail }
125 }
126
127 fn release(&self) {
129 if let Some(iops_avail) = self.iops_avail.as_ref() {
130 iops_avail.add_permits(1);
131 }
132 }
133
134 async fn acquire(&self) -> IopsReservation<'_> {
136 if let Some(iops_avail) = self.iops_avail.as_ref() {
137 IopsReservation {
138 value: Some(iops_avail.acquire().await.unwrap()),
139 }
140 } else {
141 IopsReservation { value: None }
142 }
143 }
144}
145
146static IOPS_QUOTA: std::sync::LazyLock<IopsQuota> = std::sync::LazyLock::new(IopsQuota::new);
147
148struct PrioritiesInFlight {
157 in_flight: Vec<u128>,
158}
159
160impl PrioritiesInFlight {
161 fn new(capacity: u32) -> Self {
162 Self {
163 in_flight: Vec::with_capacity(capacity as usize * 2),
164 }
165 }
166
167 fn min_in_flight(&self) -> u128 {
168 self.in_flight.first().copied().unwrap_or(u128::MAX)
169 }
170
171 fn push(&mut self, prio: u128) {
172 let pos = match self.in_flight.binary_search(&prio) {
173 Ok(pos) => pos,
174 Err(pos) => pos,
175 };
176 self.in_flight.insert(pos, prio);
177 }
178
179 fn remove(&mut self, prio: u128) {
180 if let Ok(pos) = self.in_flight.binary_search(&prio) {
181 self.in_flight.remove(pos);
182 }
183 }
184}
185
186struct IoQueueState {
187 iops_avail: u32,
189 bytes_avail: i64,
193 pending_requests: BinaryHeap<IoTask>,
195 priorities_in_flight: PrioritiesInFlight,
197 done_scheduling: bool,
200 start: Instant,
202 last_warn: AtomicU64,
204}
205
206impl IoQueueState {
207 fn new(io_capacity: u32, io_buffer_size: u64) -> Self {
208 Self {
209 iops_avail: io_capacity,
210 bytes_avail: io_buffer_size as i64,
211 pending_requests: BinaryHeap::new(),
212 priorities_in_flight: PrioritiesInFlight::new(io_capacity),
213 done_scheduling: false,
214 start: Instant::now(),
215 last_warn: AtomicU64::from(0),
216 }
217 }
218
219 fn warn_if_needed(&self) {
220 let seconds_elapsed = self.start.elapsed().as_secs();
221 let last_warn = self.last_warn.load(Ordering::Acquire);
222 let since_last_warn = seconds_elapsed - last_warn;
223 if (last_warn == 0
224 && seconds_elapsed > BACKPRESSURE_MIN
225 && seconds_elapsed < BACKPRESSURE_DEBOUNCE)
226 || since_last_warn > BACKPRESSURE_DEBOUNCE
227 {
228 tracing::event!(tracing::Level::DEBUG, "Backpressure throttle exceeded");
229 log::debug!(
230 "Backpressure throttle is full, I/O will pause until buffer is drained. Max I/O bandwidth will not be achieved because CPU is falling behind"
231 );
232 self.last_warn
233 .store(seconds_elapsed.max(1), Ordering::Release);
234 }
235 }
236
237 fn can_deliver(&self, task: &IoTask) -> bool {
238 if self.iops_avail == 0 {
239 false
240 } else if task.priority <= self.priorities_in_flight.min_in_flight() {
241 true
242 } else if task.num_bytes() as i64 > self.bytes_avail {
243 self.warn_if_needed();
244 false
245 } else {
246 true
247 }
248 }
249
250 fn next_task(&mut self) -> Option<IoTask> {
251 let task = self.pending_requests.peek()?;
252 if self.can_deliver(task) {
253 self.priorities_in_flight.push(task.priority);
254 self.iops_avail -= 1;
255 self.bytes_avail -= task.num_bytes() as i64;
256 if self.bytes_avail < 0 {
257 log::debug!(
259 "Backpressure throttle temporarily exceeded by {} bytes due to priority I/O",
260 -self.bytes_avail
261 );
262 }
263 Some(self.pending_requests.pop().unwrap())
264 } else {
265 None
266 }
267 }
268}
269
270struct IoQueue {
275 state: Mutex<IoQueueState>,
277 notify: Notify,
279}
280
281impl IoQueue {
282 fn new(io_capacity: u32, io_buffer_size: u64) -> Self {
283 Self {
284 state: Mutex::new(IoQueueState::new(io_capacity, io_buffer_size)),
285 notify: Notify::new(),
286 }
287 }
288
289 fn push(&self, task: IoTask) {
290 log::trace!(
291 "Inserting I/O request for {} bytes with priority ({},{}) into I/O queue",
292 task.num_bytes(),
293 task.priority >> 64,
294 task.priority & 0xFFFFFFFFFFFFFFFF
295 );
296 let mut state = self.state.lock().unwrap();
297 state.pending_requests.push(task);
298 drop(state);
299
300 self.notify.notify_one();
301 }
302
303 async fn pop(&self) -> Option<IoTask> {
304 loop {
305 {
306 let mut iop_res = IOPS_QUOTA.acquire().await;
311 let mut state = self.state.lock().unwrap();
313 if let Some(task) = state.next_task() {
314 iop_res.forget();
317 return Some(task);
318 }
319
320 if state.done_scheduling {
321 return None;
322 }
323 }
324
325 self.notify.notified().await;
326 }
327 }
328
329 fn on_iop_complete(&self) {
330 let mut state = self.state.lock().unwrap();
331 state.iops_avail += 1;
332 drop(state);
333
334 self.notify.notify_one();
335 }
336
337 fn on_bytes_consumed(&self, bytes: u64, priority: u128, num_reqs: usize) {
338 let mut state = self.state.lock().unwrap();
339 state.bytes_avail += bytes as i64;
340 for _ in 0..num_reqs {
341 state.priorities_in_flight.remove(priority);
342 }
343 drop(state);
344
345 self.notify.notify_one();
346 }
347
348 fn close(&self) {
349 let mut state = self.state.lock().unwrap();
350 state.done_scheduling = true;
351 let pending_requests = std::mem::take(&mut state.pending_requests);
352 drop(state);
353 for request in pending_requests {
354 request.cancel();
355 }
356
357 self.notify.notify_one();
358 }
359}
360
361struct MutableBatch<F: FnOnce(Response) + Send> {
366 when_done: Option<F>,
367 data_buffers: Vec<Bytes>,
368 num_bytes: u64,
369 priority: u128,
370 num_reqs: usize,
371 err: Option<Box<dyn std::error::Error + Send + Sync + 'static>>,
372}
373
374impl<F: FnOnce(Response) + Send> MutableBatch<F> {
375 fn new(when_done: F, num_data_buffers: u32, priority: u128, num_reqs: usize) -> Self {
376 Self {
377 when_done: Some(when_done),
378 data_buffers: vec![Bytes::default(); num_data_buffers as usize],
379 num_bytes: 0,
380 priority,
381 num_reqs,
382 err: None,
383 }
384 }
385}
386
387impl<F: FnOnce(Response) + Send> Drop for MutableBatch<F> {
392 fn drop(&mut self) {
393 let result = if self.err.is_some() {
395 Err(Error::wrapped(self.err.take().unwrap()))
396 } else {
397 let mut data = Vec::new();
398 std::mem::swap(&mut data, &mut self.data_buffers);
399 Ok(data)
400 };
401 let response = Response {
404 data: result,
405 num_bytes: self.num_bytes,
406 priority: self.priority,
407 num_reqs: self.num_reqs,
408 };
409 (self.when_done.take().unwrap())(response);
410 }
411}
412
413struct DataChunk {
414 task_idx: usize,
415 num_bytes: u64,
416 data: Result<Bytes>,
417}
418
419trait DataSink: Send {
420 fn deliver_data(&mut self, data: DataChunk);
421}
422
423impl<F: FnOnce(Response) + Send> DataSink for MutableBatch<F> {
424 fn deliver_data(&mut self, data: DataChunk) {
426 self.num_bytes += data.num_bytes;
427 match data.data {
428 Ok(data_bytes) => {
429 self.data_buffers[data.task_idx] = data_bytes;
430 }
431 Err(err) => {
432 self.err.get_or_insert(Box::new(err));
434 }
435 }
436 }
437}
438
439struct IoTask {
440 reader: Arc<dyn Reader>,
441 to_read: Range<u64>,
442 when_done: Box<dyn FnOnce(Result<Bytes>) + Send>,
443 priority: u128,
444}
445
446impl Eq for IoTask {}
447
448impl PartialEq for IoTask {
449 fn eq(&self, other: &Self) -> bool {
450 self.priority == other.priority
451 }
452}
453
454impl PartialOrd for IoTask {
455 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
456 Some(self.cmp(other))
457 }
458}
459
460impl Ord for IoTask {
461 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
462 other.priority.cmp(&self.priority)
464 }
465}
466
467impl IoTask {
468 fn num_bytes(&self) -> u64 {
469 self.to_read.end - self.to_read.start
470 }
471 fn cancel(self) {
472 (self.when_done)(Err(Error::internal(
473 "Scheduler closed before I/O was completed".to_string(),
474 )));
475 }
476
477 async fn run(self) {
478 let file_path = self.reader.path().as_ref();
479 let num_bytes = self.num_bytes();
480 let bytes = if self.to_read.start == self.to_read.end {
481 Ok(Bytes::new())
482 } else {
483 let bytes_fut = self
484 .reader
485 .get_range(self.to_read.start as usize..self.to_read.end as usize);
486 IOPS_COUNTER.fetch_add(1, Ordering::Release);
487 let num_bytes = self.num_bytes();
488 bytes_fut
489 .inspect(move |_| {
490 BYTES_READ_COUNTER.fetch_add(num_bytes, Ordering::Release);
491 })
492 .await
493 .map_err(Error::from)
494 };
495 tracing::trace!(
497 file = file_path,
498 bytes_read = num_bytes,
499 requests = 1,
500 range_start = self.to_read.start,
501 range_end = self.to_read.end,
502 "File I/O completed"
503 );
504 IOPS_QUOTA.release();
505 (self.when_done)(bytes);
506 }
507}
508
509async fn run_io_loop(tasks: Arc<IoQueue>) {
512 loop {
515 let next_task = tasks.pop().await;
516 match next_task {
517 Some(task) => {
518 tokio::spawn(task.run());
519 }
520 None => {
521 return;
523 }
524 }
525 }
526}
527
528#[derive(Debug)]
529struct StatsCollector {
530 iops: AtomicU64,
531 requests: AtomicU64,
532 bytes_read: AtomicU64,
533}
534
535impl StatsCollector {
536 fn new() -> Self {
537 Self {
538 iops: AtomicU64::new(0),
539 requests: AtomicU64::new(0),
540 bytes_read: AtomicU64::new(0),
541 }
542 }
543
544 fn iops(&self) -> u64 {
545 self.iops.load(Ordering::Relaxed)
546 }
547
548 fn bytes_read(&self) -> u64 {
549 self.bytes_read.load(Ordering::Relaxed)
550 }
551
552 fn requests(&self) -> u64 {
553 self.requests.load(Ordering::Relaxed)
554 }
555
556 fn record_request(&self, request: &[Range<u64>]) {
557 self.requests.fetch_add(1, Ordering::Relaxed);
558 self.iops.fetch_add(request.len() as u64, Ordering::Relaxed);
559 self.bytes_read.fetch_add(
560 request.iter().map(|r| r.end - r.start).sum::<u64>(),
561 Ordering::Relaxed,
562 );
563 }
564}
565
566pub struct ScanStats {
567 pub iops: u64,
568 pub requests: u64,
569 pub bytes_read: u64,
570}
571
572impl ScanStats {
573 fn new(stats: &StatsCollector) -> Self {
574 Self {
575 iops: stats.iops(),
576 requests: stats.requests(),
577 bytes_read: stats.bytes_read(),
578 }
579 }
580}
581
582enum IoQueueType {
583 Standard(Arc<IoQueue>),
584 Lite(Arc<lite::IoQueue>),
585}
586
587pub struct ScanScheduler {
596 object_store: Arc<ObjectStore>,
597 io_queue: IoQueueType,
598 stats: Arc<StatsCollector>,
599}
600
601impl Debug for ScanScheduler {
602 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
603 f.debug_struct("ScanScheduler")
604 .field("object_store", &self.object_store)
605 .finish()
606 }
607}
608
609struct Response {
610 data: Result<Vec<Bytes>>,
611 priority: u128,
612 num_reqs: usize,
613 num_bytes: u64,
614}
615
616#[derive(Debug, Clone, Copy)]
617pub struct SchedulerConfig {
618 pub io_buffer_size_bytes: u64,
622 pub use_lite_scheduler: bool,
624}
625
626impl SchedulerConfig {
627 pub fn new(io_buffer_size_bytes: u64) -> Self {
628 Self {
629 io_buffer_size_bytes,
630 use_lite_scheduler: std::env::var("LANCE_USE_LITE_SCHEDULER").is_ok(),
631 }
632 }
633
634 pub fn default_for_testing() -> Self {
636 Self {
637 io_buffer_size_bytes: 256 * 1024 * 1024,
638 use_lite_scheduler: false,
639 }
640 }
641
642 pub fn max_bandwidth(store: &ObjectStore) -> Self {
645 Self::new(32 * 1024 * 1024 * store.io_parallelism() as u64)
646 }
647
648 pub fn with_lite_scheduler(self) -> Self {
649 Self {
650 use_lite_scheduler: true,
651 ..self
652 }
653 }
654}
655
656impl ScanScheduler {
657 pub fn new(object_store: Arc<ObjectStore>, config: SchedulerConfig) -> Arc<Self> {
664 let io_capacity = object_store.io_parallelism();
665 let io_queue = if config.use_lite_scheduler {
666 let io_queue = Arc::new(lite::IoQueue::new(
667 io_capacity as u64,
668 config.io_buffer_size_bytes,
669 ));
670 IoQueueType::Lite(io_queue)
671 } else {
672 let io_queue = Arc::new(IoQueue::new(
673 io_capacity as u32,
674 config.io_buffer_size_bytes,
675 ));
676 let io_queue_clone = io_queue.clone();
677 tokio::task::spawn(async move { run_io_loop(io_queue_clone).await });
681 IoQueueType::Standard(io_queue)
682 };
683 Arc::new(Self {
684 object_store,
685 io_queue,
686 stats: Arc::new(StatsCollector::new()),
687 })
688 }
689
690 pub async fn open_file_with_priority(
699 self: &Arc<Self>,
700 path: &Path,
701 base_priority: u64,
702 file_size_bytes: &CachedFileSize,
703 ) -> Result<FileScheduler> {
704 let file_size_bytes = if let Some(size) = file_size_bytes.get() {
705 u64::from(size)
706 } else {
707 let size = self.object_store.size(path).await?;
708 if let Some(size) = NonZero::new(size) {
709 file_size_bytes.set(size);
710 }
711 size
712 };
713 let reader = self
714 .object_store
715 .open_with_size(path, file_size_bytes as usize)
716 .await?;
717 let block_size = self.object_store.block_size() as u64;
718 let max_iop_size = self.object_store.max_iop_size();
719 Ok(FileScheduler {
720 reader: reader.into(),
721 block_size,
722 root: self.clone(),
723 base_priority,
724 max_iop_size,
725 })
726 }
727
728 pub async fn open_file(
732 self: &Arc<Self>,
733 path: &Path,
734 file_size_bytes: &CachedFileSize,
735 ) -> Result<FileScheduler> {
736 self.open_file_with_priority(path, 0, file_size_bytes).await
737 }
738
739 fn do_submit_request(
740 &self,
741 reader: Arc<dyn Reader>,
742 request: Vec<Range<u64>>,
743 tx: oneshot::Sender<Response>,
744 priority: u128,
745 io_queue: &Arc<IoQueue>,
746 ) {
747 let num_iops = request.len() as u32;
748
749 let when_all_io_done = move |bytes_and_permits| {
750 let _ = tx.send(bytes_and_permits);
752 };
753
754 let dest = Arc::new(Mutex::new(Box::new(MutableBatch::new(
755 when_all_io_done,
756 num_iops,
757 priority,
758 request.len(),
759 ))));
760
761 for (task_idx, iop) in request.into_iter().enumerate() {
762 let dest = dest.clone();
763 let io_queue_clone = io_queue.clone();
764 let num_bytes = iop.end - iop.start;
765 let task = IoTask {
766 reader: reader.clone(),
767 to_read: iop,
768 priority,
769 when_done: Box::new(move |data| {
770 io_queue_clone.on_iop_complete();
771 let mut dest = dest.lock().unwrap();
772 let chunk = DataChunk {
773 data,
774 task_idx,
775 num_bytes,
776 };
777 dest.deliver_data(chunk);
778 }),
779 };
780 io_queue.push(task);
781 }
782 }
783
784 fn submit_request_standard(
785 &self,
786 reader: Arc<dyn Reader>,
787 request: Vec<Range<u64>>,
788 priority: u128,
789 io_queue: &Arc<IoQueue>,
790 ) -> impl Future<Output = Result<Vec<Bytes>>> + Send + use<> {
791 let (tx, rx) = oneshot::channel::<Response>();
792
793 self.do_submit_request(reader, request, tx, priority, io_queue);
794
795 let io_queue_clone = io_queue.clone();
796
797 rx.map(move |wrapped_rsp| {
798 let rsp = wrapped_rsp.unwrap();
801 io_queue_clone.on_bytes_consumed(rsp.num_bytes, rsp.priority, rsp.num_reqs);
802 rsp.data
803 })
804 }
805
806 fn submit_request_lite(
807 &self,
808 reader: Arc<dyn Reader>,
809 request: Vec<Range<u64>>,
810 priority: u128,
811 io_queue: &Arc<lite::IoQueue>,
812 ) -> impl Future<Output = Result<Vec<Bytes>>> + Send + use<> {
813 let maybe_tasks = request
815 .into_iter()
816 .map(|task| {
817 let reader = reader.clone();
818 let queue = io_queue.clone();
819 let run_fn = Box::new(move || {
820 reader
821 .get_range(task.start as usize..task.end as usize)
822 .map_err(Error::from)
823 .boxed()
824 });
825 queue.submit(task, priority, run_fn)
826 })
827 .collect::<Result<Vec<_>>>();
828 match maybe_tasks {
829 Ok(tasks) => async move {
830 let mut results = Vec::with_capacity(tasks.len());
831 for task in tasks {
832 results.push(task.await?);
833 }
834 Ok(results)
835 }
836 .boxed(),
837 Err(e) => async move { Err(e) }.boxed(),
838 }
839 }
840
841 pub fn submit_request(
842 &self,
843 reader: Arc<dyn Reader>,
844 request: Vec<Range<u64>>,
845 priority: u128,
846 ) -> impl Future<Output = Result<Vec<Bytes>>> + Send + use<> {
847 match &self.io_queue {
848 IoQueueType::Standard(io_queue) => futures::future::Either::Left(
849 self.submit_request_standard(reader, request, priority, io_queue),
850 ),
851 IoQueueType::Lite(io_queue) => futures::future::Either::Right(
852 self.submit_request_lite(reader, request, priority, io_queue),
853 ),
854 }
855 }
856
857 pub fn stats(&self) -> ScanStats {
858 ScanStats::new(self.stats.as_ref())
859 }
860}
861
862impl Drop for ScanScheduler {
863 fn drop(&mut self) {
864 match &self.io_queue {
876 IoQueueType::Standard(io_queue) => io_queue.close(),
877 IoQueueType::Lite(io_queue) => io_queue.close(),
878 }
879 }
880}
881
882#[derive(Clone, Debug)]
884pub struct FileScheduler {
885 reader: Arc<dyn Reader>,
886 root: Arc<ScanScheduler>,
887 block_size: u64,
888 base_priority: u64,
889 max_iop_size: u64,
890}
891
892fn is_close_together(range1: &Range<u64>, range2: &Range<u64>, block_size: u64) -> bool {
893 range2.start <= (range1.end + block_size)
895}
896
897fn is_overlapping(range1: &Range<u64>, range2: &Range<u64>) -> bool {
898 range1.start < range2.end && range2.start < range1.end
899}
900
901impl FileScheduler {
902 pub fn submit_request(
915 &self,
916 request: Vec<Range<u64>>,
917 priority: u64,
918 ) -> impl Future<Output = Result<Vec<Bytes>>> + Send + use<> {
919 let priority = ((self.base_priority as u128) << 64) + priority as u128;
921
922 let mut merged_requests = Vec::with_capacity(request.len());
923
924 if !request.is_empty() {
925 let mut curr_interval = request[0].clone();
926
927 for req in request.iter().skip(1) {
928 if is_close_together(&curr_interval, req, self.block_size) {
929 curr_interval.end = curr_interval.end.max(req.end);
930 } else {
931 merged_requests.push(curr_interval);
932 curr_interval = req.clone();
933 }
934 }
935
936 merged_requests.push(curr_interval);
937 }
938
939 let mut updated_requests = Vec::with_capacity(merged_requests.len());
940 for req in merged_requests {
941 if req.is_empty() {
942 updated_requests.push(req);
943 } else {
944 let num_requests = (req.end - req.start).div_ceil(self.max_iop_size);
945 let bytes_per_request = (req.end - req.start) / num_requests;
946 for i in 0..num_requests {
947 let start = req.start + i * bytes_per_request;
948 let end = if i == num_requests - 1 {
949 req.end
951 } else {
952 start + bytes_per_request
953 };
954 updated_requests.push(start..end);
955 }
956 }
957 }
958
959 self.root.stats.record_request(&updated_requests);
960
961 let bytes_vec_fut =
962 self.root
963 .submit_request(self.reader.clone(), updated_requests.clone(), priority);
964
965 let mut updated_index = 0;
966 let mut final_bytes = Vec::with_capacity(request.len());
967
968 async move {
969 let bytes_vec = bytes_vec_fut.await?;
970
971 let mut orig_index = 0;
972 while (updated_index < updated_requests.len()) && (orig_index < request.len()) {
973 let updated_range = &updated_requests[updated_index];
974 let orig_range = &request[orig_index];
975 let byte_offset = updated_range.start as usize;
976
977 if is_overlapping(updated_range, orig_range) {
978 let start = orig_range.start as usize - byte_offset;
980 if orig_range.end <= updated_range.end {
981 let end = orig_range.end as usize - byte_offset;
984 final_bytes.push(bytes_vec[updated_index].slice(start..end));
985 } else {
986 let orig_size = orig_range.end - orig_range.start;
989 let mut merged_bytes = Vec::with_capacity(orig_size as usize);
990 merged_bytes.extend_from_slice(&bytes_vec[updated_index].slice(start..));
991 let mut copy_offset = merged_bytes.len() as u64;
992 while copy_offset < orig_size {
993 updated_index += 1;
994 let next_range = &updated_requests[updated_index];
995 let bytes_to_take =
996 (orig_size - copy_offset).min(next_range.end - next_range.start);
997 merged_bytes.extend_from_slice(
998 &bytes_vec[updated_index].slice(0..bytes_to_take as usize),
999 );
1000 copy_offset += bytes_to_take;
1001 }
1002 final_bytes.push(Bytes::from(merged_bytes));
1003 }
1004 orig_index += 1;
1005 } else {
1006 updated_index += 1;
1007 }
1008 }
1009
1010 Ok(final_bytes)
1011 }
1012 }
1013
1014 pub fn with_priority(&self, priority: u64) -> Self {
1015 Self {
1016 reader: self.reader.clone(),
1017 root: self.root.clone(),
1018 block_size: self.block_size,
1019 max_iop_size: self.max_iop_size,
1020 base_priority: priority,
1021 }
1022 }
1023
1024 pub fn submit_single(
1031 &self,
1032 range: Range<u64>,
1033 priority: u64,
1034 ) -> impl Future<Output = Result<Bytes>> + Send {
1035 self.submit_request(vec![range], priority)
1036 .map_ok(|vec_bytes| vec_bytes.into_iter().next().unwrap())
1037 }
1038
1039 pub fn reader(&self) -> &Arc<dyn Reader> {
1045 &self.reader
1046 }
1047}
1048
1049#[cfg(test)]
1050mod tests {
1051 use std::{collections::VecDeque, time::Duration};
1052
1053 use futures::poll;
1054 use lance_core::utils::tempfile::TempObjFile;
1055 use rand::RngCore;
1056
1057 use object_store::{GetRange, ObjectStore as OSObjectStore, memory::InMemory};
1058 use tokio::{runtime::Handle, time::timeout};
1059 use url::Url;
1060
1061 use crate::{
1062 object_store::{DEFAULT_DOWNLOAD_RETRY_COUNT, DEFAULT_MAX_IOP_SIZE},
1063 testing::MockObjectStore,
1064 };
1065
1066 use super::*;
1067
1068 #[tokio::test]
1069 async fn test_full_seq_read() {
1070 let tmp_file = TempObjFile::default();
1071
1072 let obj_store = Arc::new(ObjectStore::local());
1073
1074 const DATA_SIZE: u64 = 1024 * 1024;
1076 let mut some_data = vec![0; DATA_SIZE as usize];
1077 rand::rng().fill_bytes(&mut some_data);
1078 obj_store.put(&tmp_file, &some_data).await.unwrap();
1079
1080 let config = SchedulerConfig::default_for_testing();
1081
1082 let scheduler = ScanScheduler::new(obj_store, config);
1083
1084 let file_scheduler = scheduler
1085 .open_file(&tmp_file, &CachedFileSize::unknown())
1086 .await
1087 .unwrap();
1088
1089 const READ_SIZE: u64 = 4 * 1024;
1091 let mut reqs = VecDeque::new();
1092 let mut offset = 0;
1093 while offset < DATA_SIZE {
1094 reqs.push_back(
1095 #[allow(clippy::single_range_in_vec_init)]
1096 file_scheduler
1097 .submit_request(vec![offset..offset + READ_SIZE], 0)
1098 .await
1099 .unwrap(),
1100 );
1101 offset += READ_SIZE;
1102 }
1103
1104 offset = 0;
1105 while offset < DATA_SIZE {
1107 let data = reqs.pop_front().unwrap();
1108 let actual = &data[0];
1109 let expected = &some_data[offset as usize..(offset + READ_SIZE) as usize];
1110 assert_eq!(expected, actual);
1111 offset += READ_SIZE;
1112 }
1113 }
1114
1115 #[tokio::test]
1116 async fn test_split_coalesce() {
1117 let tmp_file = TempObjFile::default();
1118
1119 let obj_store = Arc::new(ObjectStore::local());
1120
1121 const DATA_SIZE: u64 = 75 * 1024 * 1024;
1123 let mut some_data = vec![0; DATA_SIZE as usize];
1124 rand::rng().fill_bytes(&mut some_data);
1125 obj_store.put(&tmp_file, &some_data).await.unwrap();
1126
1127 let config = SchedulerConfig::default_for_testing();
1128
1129 let scheduler = ScanScheduler::new(obj_store, config);
1130
1131 let file_scheduler = scheduler
1132 .open_file(&tmp_file, &CachedFileSize::unknown())
1133 .await
1134 .unwrap();
1135
1136 let req =
1139 file_scheduler.submit_request(vec![50_000..51_000, 52_000..53_000, 54_000..55_000], 0);
1140
1141 let bytes = req.await.unwrap();
1142
1143 assert_eq!(bytes[0], &some_data[50_000..51_000]);
1144 assert_eq!(bytes[1], &some_data[52_000..53_000]);
1145 assert_eq!(bytes[2], &some_data[54_000..55_000]);
1146
1147 assert_eq!(1, scheduler.stats().iops);
1148
1149 let req = file_scheduler.submit_request(vec![0..DATA_SIZE], 0);
1151 let bytes = req.await.unwrap();
1152 assert!(bytes[0] == some_data, "data is not the same");
1153
1154 assert_eq!(6, scheduler.stats().iops);
1155
1156 let chunk_size = *DEFAULT_MAX_IOP_SIZE;
1160 let req = file_scheduler.submit_request(
1161 vec![
1162 10..chunk_size,
1163 chunk_size + 10..(chunk_size * 2) - 20,
1164 chunk_size * 2..(chunk_size * 2) + 10,
1165 ],
1166 0,
1167 );
1168
1169 let bytes = req.await.unwrap();
1170 let chunk_size = chunk_size as usize;
1171 assert!(
1172 bytes[0] == some_data[10..chunk_size],
1173 "data is not the same"
1174 );
1175 assert!(
1176 bytes[1] == some_data[chunk_size + 10..(chunk_size * 2) - 20],
1177 "data is not the same"
1178 );
1179 assert!(
1180 bytes[2] == some_data[chunk_size * 2..(chunk_size * 2) + 10],
1181 "data is not the same"
1182 );
1183 assert_eq!(8, scheduler.stats().iops);
1184
1185 let reads = (0..44)
1186 .map(|i| i * 1_000_000..(i + 1) * 1_000_000)
1187 .collect::<Vec<_>>();
1188 let req = file_scheduler.submit_request(reads, 0);
1189 let bytes = req.await.unwrap();
1190 for (i, bytes) in bytes.iter().enumerate() {
1191 assert!(
1192 bytes == &some_data[i * 1_000_000..(i + 1) * 1_000_000],
1193 "data is not the same"
1194 );
1195 }
1196 assert_eq!(11, scheduler.stats().iops);
1197 }
1198
1199 #[tokio::test]
1200 async fn test_priority() {
1201 let some_path = Path::parse("foo").unwrap();
1202 let base_store = Arc::new(InMemory::new());
1203 base_store
1204 .put(&some_path, vec![0; 1000].into())
1205 .await
1206 .unwrap();
1207
1208 let semaphore = Arc::new(tokio::sync::Semaphore::new(0));
1209 let mut obj_store = MockObjectStore::default();
1210 let semaphore_copy = semaphore.clone();
1211 obj_store
1212 .expect_get_opts()
1213 .returning(move |location, options| {
1214 let semaphore = semaphore.clone();
1215 let base_store = base_store.clone();
1216 let location = location.clone();
1217 async move {
1218 semaphore.acquire().await.unwrap().forget();
1219 base_store.get_opts(&location, options).await
1220 }
1221 .boxed()
1222 });
1223 let obj_store = Arc::new(ObjectStore::new(
1224 Arc::new(obj_store),
1225 Url::parse("mem://").unwrap(),
1226 Some(500),
1227 None,
1228 false,
1229 false,
1230 1,
1231 DEFAULT_DOWNLOAD_RETRY_COUNT,
1232 None,
1233 ));
1234
1235 let config = SchedulerConfig {
1236 io_buffer_size_bytes: 1024 * 1024,
1237 use_lite_scheduler: false,
1238 };
1239
1240 let scan_scheduler = ScanScheduler::new(obj_store, config);
1241
1242 let file_scheduler = scan_scheduler
1243 .open_file(&Path::parse("foo").unwrap(), &CachedFileSize::new(1000))
1244 .await
1245 .unwrap();
1246
1247 let first_fut = timeout(
1251 Duration::from_secs(10),
1252 file_scheduler.submit_single(0..10, 0),
1253 )
1254 .boxed();
1255
1256 let mut second_fut = timeout(
1258 Duration::from_secs(10),
1259 file_scheduler.submit_single(0..20, 100),
1260 )
1261 .boxed();
1262
1263 let mut third_fut = timeout(
1266 Duration::from_secs(10),
1267 file_scheduler.submit_single(0..30, 0),
1268 )
1269 .boxed();
1270
1271 semaphore_copy.add_permits(1);
1273 assert!(first_fut.await.unwrap().unwrap().len() == 10);
1274 assert!(poll!(&mut second_fut).is_pending());
1276 assert!(poll!(&mut third_fut).is_pending());
1277
1278 semaphore_copy.add_permits(1);
1280 assert!(third_fut.await.unwrap().unwrap().len() == 30);
1281 assert!(poll!(&mut second_fut).is_pending());
1282
1283 semaphore_copy.add_permits(1);
1285 assert!(second_fut.await.unwrap().unwrap().len() == 20);
1286 }
1287
1288 #[tokio::test(flavor = "multi_thread")]
1289 async fn test_backpressure() {
1290 let some_path = Path::parse("foo").unwrap();
1291 let base_store = Arc::new(InMemory::new());
1292 base_store
1293 .put(&some_path, vec![0; 100000].into())
1294 .await
1295 .unwrap();
1296
1297 let bytes_read = Arc::new(AtomicU64::from(0));
1298 let mut obj_store = MockObjectStore::default();
1299 let bytes_read_copy = bytes_read.clone();
1300 obj_store
1302 .expect_get_opts()
1303 .returning(move |location, options| {
1304 let range = options.range.as_ref().unwrap();
1305 let num_bytes = match range {
1306 GetRange::Bounded(bounded) => bounded.end - bounded.start,
1307 _ => panic!(),
1308 };
1309 bytes_read_copy.fetch_add(num_bytes, Ordering::Release);
1310 let location = location.clone();
1311 let base_store = base_store.clone();
1312 async move { base_store.get_opts(&location, options).await }.boxed()
1313 });
1314 let obj_store = Arc::new(ObjectStore::new(
1315 Arc::new(obj_store),
1316 Url::parse("mem://").unwrap(),
1317 Some(500),
1318 None,
1319 false,
1320 false,
1321 1,
1322 DEFAULT_DOWNLOAD_RETRY_COUNT,
1323 None,
1324 ));
1325
1326 let config = SchedulerConfig {
1327 io_buffer_size_bytes: 10,
1328 use_lite_scheduler: false,
1329 };
1330
1331 let scan_scheduler = ScanScheduler::new(obj_store.clone(), config);
1332
1333 let file_scheduler = scan_scheduler
1334 .open_file(&Path::parse("foo").unwrap(), &CachedFileSize::new(100000))
1335 .await
1336 .unwrap();
1337
1338 let wait_for_idle = || async move {
1339 let handle = Handle::current();
1340 while handle.metrics().num_alive_tasks() != 1 {
1341 tokio::time::sleep(Duration::from_millis(10)).await;
1342 }
1343 };
1344 let wait_for_bytes_read_and_idle = |target_bytes: u64| {
1345 let bytes_read = &bytes_read;
1347 async move {
1348 let bytes_read_copy = bytes_read.clone();
1349 while bytes_read_copy.load(Ordering::Acquire) < target_bytes {
1350 tokio::time::sleep(Duration::from_millis(10)).await;
1351 }
1352 wait_for_idle().await;
1353 }
1354 };
1355
1356 let first_fut = file_scheduler.submit_single(0..5, 0);
1358 let second_fut = file_scheduler.submit_single(0..5, 0);
1360 let third_fut = file_scheduler.submit_single(0..3, 0);
1362 wait_for_bytes_read_and_idle(10).await;
1364
1365 assert_eq!(first_fut.await.unwrap().len(), 5);
1366 wait_for_bytes_read_and_idle(13).await;
1368
1369 let fourth_fut = file_scheduler.submit_single(0..5, 0);
1371 wait_for_bytes_read_and_idle(13).await;
1372
1373 assert_eq!(third_fut.await.unwrap().len(), 3);
1375 wait_for_bytes_read_and_idle(18).await;
1376
1377 assert_eq!(second_fut.await.unwrap().len(), 5);
1378 let fifth_fut = file_scheduler.submit_request(vec![0..3, 90000..90007], 0);
1385 wait_for_bytes_read_and_idle(21).await;
1386
1387 let fifth_bytes = tokio::time::timeout(Duration::from_secs(10), fifth_fut)
1389 .await
1390 .unwrap();
1391 assert_eq!(
1392 fifth_bytes.unwrap().iter().map(|b| b.len()).sum::<usize>(),
1393 10
1394 );
1395
1396 assert_eq!(fourth_fut.await.unwrap().len(), 5);
1398 wait_for_bytes_read_and_idle(28).await;
1399
1400 let config = SchedulerConfig {
1402 io_buffer_size_bytes: 10,
1403 use_lite_scheduler: false,
1404 };
1405
1406 let scan_scheduler = ScanScheduler::new(obj_store, config);
1407 let file_scheduler = scan_scheduler
1408 .open_file(&Path::parse("foo").unwrap(), &CachedFileSize::new(100000))
1409 .await
1410 .unwrap();
1411
1412 let first_fut = file_scheduler.submit_single(0..10, 0);
1413 let second_fut = file_scheduler.submit_single(0..10, 0);
1414
1415 std::thread::sleep(Duration::from_millis(100));
1416 assert_eq!(first_fut.await.unwrap().len(), 10);
1417 assert_eq!(second_fut.await.unwrap().len(), 10);
1418 }
1419
1420 #[derive(Debug)]
1422 struct TrackingReader {
1423 get_range_count: Arc<AtomicU64>,
1424 path: Path,
1425 }
1426
1427 impl deepsize::DeepSizeOf for TrackingReader {
1428 fn deep_size_of_children(&self, _context: &mut deepsize::Context) -> usize {
1429 0
1430 }
1431 }
1432
1433 impl Reader for TrackingReader {
1434 fn path(&self) -> &Path {
1435 &self.path
1436 }
1437
1438 fn block_size(&self) -> usize {
1439 4096
1440 }
1441
1442 fn io_parallelism(&self) -> usize {
1443 1
1444 }
1445
1446 fn size(&self) -> futures::future::BoxFuture<'_, object_store::Result<usize>> {
1447 Box::pin(async { Ok(1_000_000) })
1448 }
1449
1450 fn get_range(
1451 &self,
1452 range: Range<usize>,
1453 ) -> futures::future::BoxFuture<'static, object_store::Result<Bytes>> {
1454 self.get_range_count.fetch_add(1, Ordering::Release);
1455 let num_bytes = range.end - range.start;
1456 Box::pin(async move { Ok(Bytes::from(vec![0u8; num_bytes])) })
1457 }
1458
1459 fn get_all(&self) -> futures::future::BoxFuture<'_, object_store::Result<Bytes>> {
1460 Box::pin(async { Ok(Bytes::from(vec![0u8; 1_000_000])) })
1461 }
1462 }
1463
1464 #[tokio::test]
1465 async fn test_lite_scheduler_submits_eagerly() {
1466 let obj_store = Arc::new(ObjectStore::memory());
1467 let config = SchedulerConfig::default_for_testing().with_lite_scheduler();
1468 let scheduler = ScanScheduler::new(obj_store, config);
1469
1470 let get_range_count = Arc::new(AtomicU64::new(0));
1471 let reader: Arc<dyn Reader> = Arc::new(TrackingReader {
1472 get_range_count: get_range_count.clone(),
1473 path: Path::parse("test").unwrap(),
1474 });
1475
1476 let fut1 = scheduler.submit_request(reader.clone(), vec![0..100], 0);
1479 let fut2 = scheduler.submit_request(reader.clone(), vec![100..200], 10);
1480 let fut3 = scheduler.submit_request(reader.clone(), vec![200..300], 20);
1481
1482 assert_eq!(get_range_count.load(Ordering::Acquire), 3);
1484
1485 assert_eq!(fut1.await.unwrap()[0].len(), 100);
1487 assert_eq!(fut2.await.unwrap()[0].len(), 100);
1488 assert_eq!(fut3.await.unwrap()[0].len(), 100);
1489 }
1490
1491 #[test_log::test(tokio::test(flavor = "multi_thread"))]
1492 async fn stress_backpressure() {
1493 let some_path = Path::parse("foo").unwrap();
1497 let obj_store = Arc::new(ObjectStore::memory());
1498 obj_store
1499 .put(&some_path, vec![0; 100000].as_slice())
1500 .await
1501 .unwrap();
1502
1503 let config = SchedulerConfig {
1505 io_buffer_size_bytes: 1,
1506 use_lite_scheduler: false,
1507 };
1508 let scan_scheduler = ScanScheduler::new(obj_store.clone(), config);
1509 let file_scheduler = scan_scheduler
1510 .open_file(&some_path, &CachedFileSize::unknown())
1511 .await
1512 .unwrap();
1513
1514 let mut futs = Vec::with_capacity(10000);
1515 for idx in 0..10000 {
1516 futs.push(file_scheduler.submit_single(idx..idx + 1, idx));
1517 }
1518
1519 for fut in futs {
1520 fut.await.unwrap();
1521 }
1522 }
1523}