use bytes::Bytes;
use futures::channel::oneshot;
use futures::{FutureExt, TryFutureExt};
use object_store::path::Path;
use std::collections::BinaryHeap;
use std::fmt::Debug;
use std::future::Future;
use std::num::NonZero;
use std::ops::Range;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Instant;
use tokio::sync::{Notify, Semaphore, SemaphorePermit};
use lance_core::{Error, Result};
use crate::object_store::ObjectStore;
use crate::traits::Reader;
use crate::utils::CachedFileSize;
mod lite;
const BACKPRESSURE_MIN: u64 = 5;
const BACKPRESSURE_DEBOUNCE: u64 = 60;
static IOPS_COUNTER: AtomicU64 = AtomicU64::new(0);
static BYTES_READ_COUNTER: AtomicU64 = AtomicU64::new(0);
static DEFAULT_PROCESS_IOPS_LIMIT: i32 = 128;
pub fn iops_counter() -> u64 {
IOPS_COUNTER.load(Ordering::Acquire)
}
pub fn bytes_read_counter() -> u64 {
BYTES_READ_COUNTER.load(Ordering::Acquire)
}
struct IopsQuota {
iops_avail: Option<Semaphore>,
}
struct IopsReservation<'a> {
value: Option<SemaphorePermit<'a>>,
}
impl IopsReservation<'_> {
fn forget(&mut self) {
if let Some(value) = self.value.take() {
value.forget();
}
}
}
impl IopsQuota {
fn new() -> Self {
let initial_capacity = std::env::var("LANCE_PROCESS_IO_THREADS_LIMIT")
.map(|s| {
s.parse::<i32>().unwrap_or_else(|_| {
log::warn!("Ignoring invalid LANCE_PROCESS_IO_THREADS_LIMIT: {}", s);
DEFAULT_PROCESS_IOPS_LIMIT
})
})
.unwrap_or(DEFAULT_PROCESS_IOPS_LIMIT);
let iops_avail = if initial_capacity <= 0 {
None
} else {
Some(Semaphore::new(initial_capacity as usize))
};
Self { iops_avail }
}
fn release(&self) {
if let Some(iops_avail) = self.iops_avail.as_ref() {
iops_avail.add_permits(1);
}
}
async fn acquire(&self) -> IopsReservation<'_> {
if let Some(iops_avail) = self.iops_avail.as_ref() {
IopsReservation {
value: Some(iops_avail.acquire().await.unwrap()),
}
} else {
IopsReservation { value: None }
}
}
}
static IOPS_QUOTA: std::sync::LazyLock<IopsQuota> = std::sync::LazyLock::new(IopsQuota::new);
struct PrioritiesInFlight {
in_flight: Vec<u128>,
}
impl PrioritiesInFlight {
fn new(capacity: u32) -> Self {
Self {
in_flight: Vec::with_capacity(capacity as usize * 2),
}
}
fn min_in_flight(&self) -> u128 {
self.in_flight.first().copied().unwrap_or(u128::MAX)
}
fn push(&mut self, prio: u128) {
let pos = match self.in_flight.binary_search(&prio) {
Ok(pos) => pos,
Err(pos) => pos,
};
self.in_flight.insert(pos, prio);
}
fn remove(&mut self, prio: u128) {
if let Ok(pos) = self.in_flight.binary_search(&prio) {
self.in_flight.remove(pos);
}
}
}
struct IoQueueState {
iops_avail: u32,
bytes_avail: i64,
pending_requests: BinaryHeap<IoTask>,
priorities_in_flight: PrioritiesInFlight,
done_scheduling: bool,
start: Instant,
last_warn: AtomicU64,
}
impl IoQueueState {
fn new(io_capacity: u32, io_buffer_size: u64) -> Self {
Self {
iops_avail: io_capacity,
bytes_avail: io_buffer_size as i64,
pending_requests: BinaryHeap::new(),
priorities_in_flight: PrioritiesInFlight::new(io_capacity),
done_scheduling: false,
start: Instant::now(),
last_warn: AtomicU64::from(0),
}
}
fn warn_if_needed(&self) {
let seconds_elapsed = self.start.elapsed().as_secs();
let last_warn = self.last_warn.load(Ordering::Acquire);
let since_last_warn = seconds_elapsed - last_warn;
if (last_warn == 0
&& seconds_elapsed > BACKPRESSURE_MIN
&& seconds_elapsed < BACKPRESSURE_DEBOUNCE)
|| since_last_warn > BACKPRESSURE_DEBOUNCE
{
tracing::event!(tracing::Level::DEBUG, "Backpressure throttle exceeded");
log::debug!(
"Backpressure throttle is full, I/O will pause until buffer is drained. Max I/O bandwidth will not be achieved because CPU is falling behind"
);
self.last_warn
.store(seconds_elapsed.max(1), Ordering::Release);
}
}
fn can_deliver(&self, task: &IoTask) -> bool {
if self.iops_avail == 0 {
false
} else if task.priority <= self.priorities_in_flight.min_in_flight() {
true
} else if task.num_bytes() as i64 > self.bytes_avail {
self.warn_if_needed();
false
} else {
true
}
}
fn next_task(&mut self) -> Option<IoTask> {
let task = self.pending_requests.peek()?;
if self.can_deliver(task) {
self.priorities_in_flight.push(task.priority);
self.iops_avail -= 1;
self.bytes_avail -= task.num_bytes() as i64;
if self.bytes_avail < 0 {
log::debug!(
"Backpressure throttle temporarily exceeded by {} bytes due to priority I/O",
-self.bytes_avail
);
}
Some(self.pending_requests.pop().unwrap())
} else {
None
}
}
}
struct IoQueue {
state: Mutex<IoQueueState>,
notify: Notify,
}
impl IoQueue {
fn new(io_capacity: u32, io_buffer_size: u64) -> Self {
Self {
state: Mutex::new(IoQueueState::new(io_capacity, io_buffer_size)),
notify: Notify::new(),
}
}
fn push(&self, task: IoTask) {
log::trace!(
"Inserting I/O request for {} bytes with priority ({},{}) into I/O queue",
task.num_bytes(),
task.priority >> 64,
task.priority & 0xFFFFFFFFFFFFFFFF
);
let mut state = self.state.lock().unwrap();
state.pending_requests.push(task);
drop(state);
self.notify.notify_one();
}
async fn pop(&self) -> Option<IoTask> {
loop {
{
let mut iop_res = IOPS_QUOTA.acquire().await;
let mut state = self.state.lock().unwrap();
if let Some(task) = state.next_task() {
iop_res.forget();
return Some(task);
}
if state.done_scheduling {
return None;
}
}
self.notify.notified().await;
}
}
fn on_iop_complete(&self) {
let mut state = self.state.lock().unwrap();
state.iops_avail += 1;
drop(state);
self.notify.notify_one();
}
fn on_bytes_consumed(&self, bytes: u64, priority: u128, num_reqs: usize) {
let mut state = self.state.lock().unwrap();
state.bytes_avail += bytes as i64;
for _ in 0..num_reqs {
state.priorities_in_flight.remove(priority);
}
drop(state);
self.notify.notify_one();
}
fn close(&self) {
let mut state = self.state.lock().unwrap();
state.done_scheduling = true;
let pending_requests = std::mem::take(&mut state.pending_requests);
drop(state);
for request in pending_requests {
request.cancel();
}
self.notify.notify_one();
}
}
struct MutableBatch<F: FnOnce(Response) + Send> {
when_done: Option<F>,
data_buffers: Vec<Bytes>,
num_bytes: u64,
priority: u128,
num_reqs: usize,
err: Option<Box<dyn std::error::Error + Send + Sync + 'static>>,
}
impl<F: FnOnce(Response) + Send> MutableBatch<F> {
fn new(when_done: F, num_data_buffers: u32, priority: u128, num_reqs: usize) -> Self {
Self {
when_done: Some(when_done),
data_buffers: vec![Bytes::default(); num_data_buffers as usize],
num_bytes: 0,
priority,
num_reqs,
err: None,
}
}
}
impl<F: FnOnce(Response) + Send> Drop for MutableBatch<F> {
fn drop(&mut self) {
let result = if self.err.is_some() {
Err(Error::wrapped(self.err.take().unwrap()))
} else {
let mut data = Vec::new();
std::mem::swap(&mut data, &mut self.data_buffers);
Ok(data)
};
let response = Response {
data: result,
num_bytes: self.num_bytes,
priority: self.priority,
num_reqs: self.num_reqs,
};
(self.when_done.take().unwrap())(response);
}
}
struct DataChunk {
task_idx: usize,
num_bytes: u64,
data: Result<Bytes>,
}
trait DataSink: Send {
fn deliver_data(&mut self, data: DataChunk);
}
impl<F: FnOnce(Response) + Send> DataSink for MutableBatch<F> {
fn deliver_data(&mut self, data: DataChunk) {
self.num_bytes += data.num_bytes;
match data.data {
Ok(data_bytes) => {
self.data_buffers[data.task_idx] = data_bytes;
}
Err(err) => {
self.err.get_or_insert(Box::new(err));
}
}
}
}
struct IoTask {
reader: Arc<dyn Reader>,
to_read: Range<u64>,
when_done: Box<dyn FnOnce(Result<Bytes>) + Send>,
priority: u128,
}
impl Eq for IoTask {}
impl PartialEq for IoTask {
fn eq(&self, other: &Self) -> bool {
self.priority == other.priority
}
}
impl PartialOrd for IoTask {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Ord for IoTask {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
other.priority.cmp(&self.priority)
}
}
impl IoTask {
fn num_bytes(&self) -> u64 {
self.to_read.end - self.to_read.start
}
fn cancel(self) {
(self.when_done)(Err(Error::internal(
"Scheduler closed before I/O was completed".to_string(),
)));
}
async fn run(self) {
let file_path = self.reader.path().as_ref();
let num_bytes = self.num_bytes();
let bytes = if self.to_read.start == self.to_read.end {
Ok(Bytes::new())
} else {
let bytes_fut = self
.reader
.get_range(self.to_read.start as usize..self.to_read.end as usize);
IOPS_COUNTER.fetch_add(1, Ordering::Release);
let num_bytes = self.num_bytes();
bytes_fut
.inspect(move |_| {
BYTES_READ_COUNTER.fetch_add(num_bytes, Ordering::Release);
})
.await
.map_err(Error::from)
};
tracing::trace!(
file = file_path,
bytes_read = num_bytes,
requests = 1,
range_start = self.to_read.start,
range_end = self.to_read.end,
"File I/O completed"
);
IOPS_QUOTA.release();
(self.when_done)(bytes);
}
}
async fn run_io_loop(tasks: Arc<IoQueue>) {
loop {
let next_task = tasks.pop().await;
match next_task {
Some(task) => {
tokio::spawn(task.run());
}
None => {
return;
}
}
}
}
#[derive(Debug)]
struct StatsCollector {
iops: AtomicU64,
requests: AtomicU64,
bytes_read: AtomicU64,
}
impl StatsCollector {
fn new() -> Self {
Self {
iops: AtomicU64::new(0),
requests: AtomicU64::new(0),
bytes_read: AtomicU64::new(0),
}
}
fn iops(&self) -> u64 {
self.iops.load(Ordering::Relaxed)
}
fn bytes_read(&self) -> u64 {
self.bytes_read.load(Ordering::Relaxed)
}
fn requests(&self) -> u64 {
self.requests.load(Ordering::Relaxed)
}
fn record_request(&self, request: &[Range<u64>]) {
self.requests.fetch_add(1, Ordering::Relaxed);
self.iops.fetch_add(request.len() as u64, Ordering::Relaxed);
self.bytes_read.fetch_add(
request.iter().map(|r| r.end - r.start).sum::<u64>(),
Ordering::Relaxed,
);
}
}
pub struct ScanStats {
pub iops: u64,
pub requests: u64,
pub bytes_read: u64,
}
impl ScanStats {
fn new(stats: &StatsCollector) -> Self {
Self {
iops: stats.iops(),
requests: stats.requests(),
bytes_read: stats.bytes_read(),
}
}
}
enum IoQueueType {
Standard(Arc<IoQueue>),
Lite(Arc<lite::IoQueue>),
}
pub struct ScanScheduler {
object_store: Arc<ObjectStore>,
io_queue: IoQueueType,
stats: Arc<StatsCollector>,
}
impl Debug for ScanScheduler {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ScanScheduler")
.field("object_store", &self.object_store)
.finish()
}
}
struct Response {
data: Result<Vec<Bytes>>,
priority: u128,
num_reqs: usize,
num_bytes: u64,
}
#[derive(Debug, Clone, Copy)]
pub struct SchedulerConfig {
pub io_buffer_size_bytes: u64,
pub use_lite_scheduler: bool,
}
impl SchedulerConfig {
pub fn new(io_buffer_size_bytes: u64) -> Self {
Self {
io_buffer_size_bytes,
use_lite_scheduler: std::env::var("LANCE_USE_LITE_SCHEDULER").is_ok(),
}
}
pub fn default_for_testing() -> Self {
Self {
io_buffer_size_bytes: 256 * 1024 * 1024,
use_lite_scheduler: false,
}
}
pub fn max_bandwidth(store: &ObjectStore) -> Self {
Self::new(32 * 1024 * 1024 * store.io_parallelism() as u64)
}
pub fn with_lite_scheduler(self) -> Self {
Self {
use_lite_scheduler: true,
..self
}
}
}
impl ScanScheduler {
pub fn new(object_store: Arc<ObjectStore>, config: SchedulerConfig) -> Arc<Self> {
let io_capacity = object_store.io_parallelism();
let io_queue = if config.use_lite_scheduler {
let io_queue = Arc::new(lite::IoQueue::new(
io_capacity as u64,
config.io_buffer_size_bytes,
));
IoQueueType::Lite(io_queue)
} else {
let io_queue = Arc::new(IoQueue::new(
io_capacity as u32,
config.io_buffer_size_bytes,
));
let io_queue_clone = io_queue.clone();
tokio::task::spawn(async move { run_io_loop(io_queue_clone).await });
IoQueueType::Standard(io_queue)
};
Arc::new(Self {
object_store,
io_queue,
stats: Arc::new(StatsCollector::new()),
})
}
pub async fn open_file_with_priority(
self: &Arc<Self>,
path: &Path,
base_priority: u64,
file_size_bytes: &CachedFileSize,
) -> Result<FileScheduler> {
let file_size_bytes = if let Some(size) = file_size_bytes.get() {
u64::from(size)
} else {
let size = self.object_store.size(path).await?;
if let Some(size) = NonZero::new(size) {
file_size_bytes.set(size);
}
size
};
let reader = self
.object_store
.open_with_size(path, file_size_bytes as usize)
.await?;
let block_size = self.object_store.block_size() as u64;
let max_iop_size = self.object_store.max_iop_size();
Ok(FileScheduler {
reader: reader.into(),
block_size,
root: self.clone(),
base_priority,
max_iop_size,
})
}
pub async fn open_file(
self: &Arc<Self>,
path: &Path,
file_size_bytes: &CachedFileSize,
) -> Result<FileScheduler> {
self.open_file_with_priority(path, 0, file_size_bytes).await
}
fn do_submit_request(
&self,
reader: Arc<dyn Reader>,
request: Vec<Range<u64>>,
tx: oneshot::Sender<Response>,
priority: u128,
io_queue: &Arc<IoQueue>,
) {
let num_iops = request.len() as u32;
let when_all_io_done = move |bytes_and_permits| {
let _ = tx.send(bytes_and_permits);
};
let dest = Arc::new(Mutex::new(Box::new(MutableBatch::new(
when_all_io_done,
num_iops,
priority,
request.len(),
))));
for (task_idx, iop) in request.into_iter().enumerate() {
let dest = dest.clone();
let io_queue_clone = io_queue.clone();
let num_bytes = iop.end - iop.start;
let task = IoTask {
reader: reader.clone(),
to_read: iop,
priority,
when_done: Box::new(move |data| {
io_queue_clone.on_iop_complete();
let mut dest = dest.lock().unwrap();
let chunk = DataChunk {
data,
task_idx,
num_bytes,
};
dest.deliver_data(chunk);
}),
};
io_queue.push(task);
}
}
fn submit_request_standard(
&self,
reader: Arc<dyn Reader>,
request: Vec<Range<u64>>,
priority: u128,
io_queue: &Arc<IoQueue>,
) -> impl Future<Output = Result<Vec<Bytes>>> + Send + use<> {
let (tx, rx) = oneshot::channel::<Response>();
self.do_submit_request(reader, request, tx, priority, io_queue);
let io_queue_clone = io_queue.clone();
rx.map(move |wrapped_rsp| {
let rsp = wrapped_rsp.unwrap();
io_queue_clone.on_bytes_consumed(rsp.num_bytes, rsp.priority, rsp.num_reqs);
rsp.data
})
}
fn submit_request_lite(
&self,
reader: Arc<dyn Reader>,
request: Vec<Range<u64>>,
priority: u128,
io_queue: &Arc<lite::IoQueue>,
) -> impl Future<Output = Result<Vec<Bytes>>> + Send + use<> {
let maybe_tasks = request
.into_iter()
.map(|task| {
let reader = reader.clone();
let queue = io_queue.clone();
let run_fn = Box::new(move || {
reader
.get_range(task.start as usize..task.end as usize)
.map_err(Error::from)
.boxed()
});
queue.submit(task, priority, run_fn)
})
.collect::<Result<Vec<_>>>();
match maybe_tasks {
Ok(tasks) => async move {
let mut results = Vec::with_capacity(tasks.len());
for task in tasks {
results.push(task.await?);
}
Ok(results)
}
.boxed(),
Err(e) => async move { Err(e) }.boxed(),
}
}
pub fn submit_request(
&self,
reader: Arc<dyn Reader>,
request: Vec<Range<u64>>,
priority: u128,
) -> impl Future<Output = Result<Vec<Bytes>>> + Send + use<> {
match &self.io_queue {
IoQueueType::Standard(io_queue) => futures::future::Either::Left(
self.submit_request_standard(reader, request, priority, io_queue),
),
IoQueueType::Lite(io_queue) => futures::future::Either::Right(
self.submit_request_lite(reader, request, priority, io_queue),
),
}
}
pub fn stats(&self) -> ScanStats {
ScanStats::new(self.stats.as_ref())
}
}
impl Drop for ScanScheduler {
fn drop(&mut self) {
match &self.io_queue {
IoQueueType::Standard(io_queue) => io_queue.close(),
IoQueueType::Lite(io_queue) => io_queue.close(),
}
}
}
#[derive(Clone, Debug)]
pub struct FileScheduler {
reader: Arc<dyn Reader>,
root: Arc<ScanScheduler>,
block_size: u64,
base_priority: u64,
max_iop_size: u64,
}
fn is_close_together(range1: &Range<u64>, range2: &Range<u64>, block_size: u64) -> bool {
range2.start <= (range1.end + block_size)
}
fn is_overlapping(range1: &Range<u64>, range2: &Range<u64>) -> bool {
range1.start < range2.end && range2.start < range1.end
}
impl FileScheduler {
pub fn submit_request(
&self,
request: Vec<Range<u64>>,
priority: u64,
) -> impl Future<Output = Result<Vec<Bytes>>> + Send + use<> {
let priority = ((self.base_priority as u128) << 64) + priority as u128;
let mut merged_requests = Vec::with_capacity(request.len());
if !request.is_empty() {
let mut curr_interval = request[0].clone();
for req in request.iter().skip(1) {
if is_close_together(&curr_interval, req, self.block_size) {
curr_interval.end = curr_interval.end.max(req.end);
} else {
merged_requests.push(curr_interval);
curr_interval = req.clone();
}
}
merged_requests.push(curr_interval);
}
let mut updated_requests = Vec::with_capacity(merged_requests.len());
for req in merged_requests {
if req.is_empty() {
updated_requests.push(req);
} else {
let num_requests = (req.end - req.start).div_ceil(self.max_iop_size);
let bytes_per_request = (req.end - req.start) / num_requests;
for i in 0..num_requests {
let start = req.start + i * bytes_per_request;
let end = if i == num_requests - 1 {
req.end
} else {
start + bytes_per_request
};
updated_requests.push(start..end);
}
}
}
self.root.stats.record_request(&updated_requests);
let bytes_vec_fut =
self.root
.submit_request(self.reader.clone(), updated_requests.clone(), priority);
let mut updated_index = 0;
let mut final_bytes = Vec::with_capacity(request.len());
async move {
let bytes_vec = bytes_vec_fut.await?;
let mut orig_index = 0;
while (updated_index < updated_requests.len()) && (orig_index < request.len()) {
let updated_range = &updated_requests[updated_index];
let orig_range = &request[orig_index];
let byte_offset = updated_range.start as usize;
if is_overlapping(updated_range, orig_range) {
let start = orig_range.start as usize - byte_offset;
if orig_range.end <= updated_range.end {
let end = orig_range.end as usize - byte_offset;
final_bytes.push(bytes_vec[updated_index].slice(start..end));
} else {
let orig_size = orig_range.end - orig_range.start;
let mut merged_bytes = Vec::with_capacity(orig_size as usize);
merged_bytes.extend_from_slice(&bytes_vec[updated_index].slice(start..));
let mut copy_offset = merged_bytes.len() as u64;
while copy_offset < orig_size {
updated_index += 1;
let next_range = &updated_requests[updated_index];
let bytes_to_take =
(orig_size - copy_offset).min(next_range.end - next_range.start);
merged_bytes.extend_from_slice(
&bytes_vec[updated_index].slice(0..bytes_to_take as usize),
);
copy_offset += bytes_to_take;
}
final_bytes.push(Bytes::from(merged_bytes));
}
orig_index += 1;
} else {
updated_index += 1;
}
}
Ok(final_bytes)
}
}
pub fn with_priority(&self, priority: u64) -> Self {
Self {
reader: self.reader.clone(),
root: self.root.clone(),
block_size: self.block_size,
max_iop_size: self.max_iop_size,
base_priority: priority,
}
}
pub fn submit_single(
&self,
range: Range<u64>,
priority: u64,
) -> impl Future<Output = Result<Bytes>> + Send {
self.submit_request(vec![range], priority)
.map_ok(|vec_bytes| vec_bytes.into_iter().next().unwrap())
}
pub fn reader(&self) -> &Arc<dyn Reader> {
&self.reader
}
}
#[cfg(test)]
mod tests {
use std::{collections::VecDeque, time::Duration};
use futures::poll;
use lance_core::utils::tempfile::TempObjFile;
use rand::RngCore;
use object_store::{GetRange, ObjectStore as OSObjectStore, memory::InMemory};
use tokio::{runtime::Handle, time::timeout};
use url::Url;
use crate::{
object_store::{DEFAULT_DOWNLOAD_RETRY_COUNT, DEFAULT_MAX_IOP_SIZE},
testing::MockObjectStore,
};
use super::*;
#[tokio::test]
async fn test_full_seq_read() {
let tmp_file = TempObjFile::default();
let obj_store = Arc::new(ObjectStore::local());
const DATA_SIZE: u64 = 1024 * 1024;
let mut some_data = vec![0; DATA_SIZE as usize];
rand::rng().fill_bytes(&mut some_data);
obj_store.put(&tmp_file, &some_data).await.unwrap();
let config = SchedulerConfig::default_for_testing();
let scheduler = ScanScheduler::new(obj_store, config);
let file_scheduler = scheduler
.open_file(&tmp_file, &CachedFileSize::unknown())
.await
.unwrap();
const READ_SIZE: u64 = 4 * 1024;
let mut reqs = VecDeque::new();
let mut offset = 0;
while offset < DATA_SIZE {
reqs.push_back(
#[allow(clippy::single_range_in_vec_init)]
file_scheduler
.submit_request(vec![offset..offset + READ_SIZE], 0)
.await
.unwrap(),
);
offset += READ_SIZE;
}
offset = 0;
while offset < DATA_SIZE {
let data = reqs.pop_front().unwrap();
let actual = &data[0];
let expected = &some_data[offset as usize..(offset + READ_SIZE) as usize];
assert_eq!(expected, actual);
offset += READ_SIZE;
}
}
#[tokio::test]
async fn test_split_coalesce() {
let tmp_file = TempObjFile::default();
let obj_store = Arc::new(ObjectStore::local());
const DATA_SIZE: u64 = 75 * 1024 * 1024;
let mut some_data = vec![0; DATA_SIZE as usize];
rand::rng().fill_bytes(&mut some_data);
obj_store.put(&tmp_file, &some_data).await.unwrap();
let config = SchedulerConfig::default_for_testing();
let scheduler = ScanScheduler::new(obj_store, config);
let file_scheduler = scheduler
.open_file(&tmp_file, &CachedFileSize::unknown())
.await
.unwrap();
let req =
file_scheduler.submit_request(vec![50_000..51_000, 52_000..53_000, 54_000..55_000], 0);
let bytes = req.await.unwrap();
assert_eq!(bytes[0], &some_data[50_000..51_000]);
assert_eq!(bytes[1], &some_data[52_000..53_000]);
assert_eq!(bytes[2], &some_data[54_000..55_000]);
assert_eq!(1, scheduler.stats().iops);
let req = file_scheduler.submit_request(vec![0..DATA_SIZE], 0);
let bytes = req.await.unwrap();
assert!(bytes[0] == some_data, "data is not the same");
assert_eq!(6, scheduler.stats().iops);
let chunk_size = *DEFAULT_MAX_IOP_SIZE;
let req = file_scheduler.submit_request(
vec![
10..chunk_size,
chunk_size + 10..(chunk_size * 2) - 20,
chunk_size * 2..(chunk_size * 2) + 10,
],
0,
);
let bytes = req.await.unwrap();
let chunk_size = chunk_size as usize;
assert!(
bytes[0] == some_data[10..chunk_size],
"data is not the same"
);
assert!(
bytes[1] == some_data[chunk_size + 10..(chunk_size * 2) - 20],
"data is not the same"
);
assert!(
bytes[2] == some_data[chunk_size * 2..(chunk_size * 2) + 10],
"data is not the same"
);
assert_eq!(8, scheduler.stats().iops);
let reads = (0..44)
.map(|i| i * 1_000_000..(i + 1) * 1_000_000)
.collect::<Vec<_>>();
let req = file_scheduler.submit_request(reads, 0);
let bytes = req.await.unwrap();
for (i, bytes) in bytes.iter().enumerate() {
assert!(
bytes == &some_data[i * 1_000_000..(i + 1) * 1_000_000],
"data is not the same"
);
}
assert_eq!(11, scheduler.stats().iops);
}
#[tokio::test]
async fn test_priority() {
let some_path = Path::parse("foo").unwrap();
let base_store = Arc::new(InMemory::new());
base_store
.put(&some_path, vec![0; 1000].into())
.await
.unwrap();
let semaphore = Arc::new(tokio::sync::Semaphore::new(0));
let mut obj_store = MockObjectStore::default();
let semaphore_copy = semaphore.clone();
obj_store
.expect_get_opts()
.returning(move |location, options| {
let semaphore = semaphore.clone();
let base_store = base_store.clone();
let location = location.clone();
async move {
semaphore.acquire().await.unwrap().forget();
base_store.get_opts(&location, options).await
}
.boxed()
});
let obj_store = Arc::new(ObjectStore::new(
Arc::new(obj_store),
Url::parse("mem://").unwrap(),
Some(500),
None,
false,
false,
1,
DEFAULT_DOWNLOAD_RETRY_COUNT,
None,
));
let config = SchedulerConfig {
io_buffer_size_bytes: 1024 * 1024,
use_lite_scheduler: false,
};
let scan_scheduler = ScanScheduler::new(obj_store, config);
let file_scheduler = scan_scheduler
.open_file(&Path::parse("foo").unwrap(), &CachedFileSize::new(1000))
.await
.unwrap();
let first_fut = timeout(
Duration::from_secs(10),
file_scheduler.submit_single(0..10, 0),
)
.boxed();
let mut second_fut = timeout(
Duration::from_secs(10),
file_scheduler.submit_single(0..20, 100),
)
.boxed();
let mut third_fut = timeout(
Duration::from_secs(10),
file_scheduler.submit_single(0..30, 0),
)
.boxed();
semaphore_copy.add_permits(1);
assert!(first_fut.await.unwrap().unwrap().len() == 10);
assert!(poll!(&mut second_fut).is_pending());
assert!(poll!(&mut third_fut).is_pending());
semaphore_copy.add_permits(1);
assert!(third_fut.await.unwrap().unwrap().len() == 30);
assert!(poll!(&mut second_fut).is_pending());
semaphore_copy.add_permits(1);
assert!(second_fut.await.unwrap().unwrap().len() == 20);
}
#[tokio::test(flavor = "multi_thread")]
async fn test_backpressure() {
let some_path = Path::parse("foo").unwrap();
let base_store = Arc::new(InMemory::new());
base_store
.put(&some_path, vec![0; 100000].into())
.await
.unwrap();
let bytes_read = Arc::new(AtomicU64::from(0));
let mut obj_store = MockObjectStore::default();
let bytes_read_copy = bytes_read.clone();
obj_store
.expect_get_opts()
.returning(move |location, options| {
let range = options.range.as_ref().unwrap();
let num_bytes = match range {
GetRange::Bounded(bounded) => bounded.end - bounded.start,
_ => panic!(),
};
bytes_read_copy.fetch_add(num_bytes, Ordering::Release);
let location = location.clone();
let base_store = base_store.clone();
async move { base_store.get_opts(&location, options).await }.boxed()
});
let obj_store = Arc::new(ObjectStore::new(
Arc::new(obj_store),
Url::parse("mem://").unwrap(),
Some(500),
None,
false,
false,
1,
DEFAULT_DOWNLOAD_RETRY_COUNT,
None,
));
let config = SchedulerConfig {
io_buffer_size_bytes: 10,
use_lite_scheduler: false,
};
let scan_scheduler = ScanScheduler::new(obj_store.clone(), config);
let file_scheduler = scan_scheduler
.open_file(&Path::parse("foo").unwrap(), &CachedFileSize::new(100000))
.await
.unwrap();
let wait_for_idle = || async move {
let handle = Handle::current();
while handle.metrics().num_alive_tasks() != 1 {
tokio::time::sleep(Duration::from_millis(10)).await;
}
};
let wait_for_bytes_read_and_idle = |target_bytes: u64| {
let bytes_read = &bytes_read;
async move {
let bytes_read_copy = bytes_read.clone();
while bytes_read_copy.load(Ordering::Acquire) < target_bytes {
tokio::time::sleep(Duration::from_millis(10)).await;
}
wait_for_idle().await;
}
};
let first_fut = file_scheduler.submit_single(0..5, 0);
let second_fut = file_scheduler.submit_single(0..5, 0);
let third_fut = file_scheduler.submit_single(0..3, 0);
wait_for_bytes_read_and_idle(10).await;
assert_eq!(first_fut.await.unwrap().len(), 5);
wait_for_bytes_read_and_idle(13).await;
let fourth_fut = file_scheduler.submit_single(0..5, 0);
wait_for_bytes_read_and_idle(13).await;
assert_eq!(third_fut.await.unwrap().len(), 3);
wait_for_bytes_read_and_idle(18).await;
assert_eq!(second_fut.await.unwrap().len(), 5);
let fifth_fut = file_scheduler.submit_request(vec![0..3, 90000..90007], 0);
wait_for_bytes_read_and_idle(21).await;
let fifth_bytes = tokio::time::timeout(Duration::from_secs(10), fifth_fut)
.await
.unwrap();
assert_eq!(
fifth_bytes.unwrap().iter().map(|b| b.len()).sum::<usize>(),
10
);
assert_eq!(fourth_fut.await.unwrap().len(), 5);
wait_for_bytes_read_and_idle(28).await;
let config = SchedulerConfig {
io_buffer_size_bytes: 10,
use_lite_scheduler: false,
};
let scan_scheduler = ScanScheduler::new(obj_store, config);
let file_scheduler = scan_scheduler
.open_file(&Path::parse("foo").unwrap(), &CachedFileSize::new(100000))
.await
.unwrap();
let first_fut = file_scheduler.submit_single(0..10, 0);
let second_fut = file_scheduler.submit_single(0..10, 0);
std::thread::sleep(Duration::from_millis(100));
assert_eq!(first_fut.await.unwrap().len(), 10);
assert_eq!(second_fut.await.unwrap().len(), 10);
}
#[derive(Debug)]
struct TrackingReader {
get_range_count: Arc<AtomicU64>,
path: Path,
}
impl deepsize::DeepSizeOf for TrackingReader {
fn deep_size_of_children(&self, _context: &mut deepsize::Context) -> usize {
0
}
}
impl Reader for TrackingReader {
fn path(&self) -> &Path {
&self.path
}
fn block_size(&self) -> usize {
4096
}
fn io_parallelism(&self) -> usize {
1
}
fn size(&self) -> futures::future::BoxFuture<'_, object_store::Result<usize>> {
Box::pin(async { Ok(1_000_000) })
}
fn get_range(
&self,
range: Range<usize>,
) -> futures::future::BoxFuture<'static, object_store::Result<Bytes>> {
self.get_range_count.fetch_add(1, Ordering::Release);
let num_bytes = range.end - range.start;
Box::pin(async move { Ok(Bytes::from(vec![0u8; num_bytes])) })
}
fn get_all(&self) -> futures::future::BoxFuture<'_, object_store::Result<Bytes>> {
Box::pin(async { Ok(Bytes::from(vec![0u8; 1_000_000])) })
}
}
#[tokio::test]
async fn test_lite_scheduler_submits_eagerly() {
let obj_store = Arc::new(ObjectStore::memory());
let config = SchedulerConfig::default_for_testing().with_lite_scheduler();
let scheduler = ScanScheduler::new(obj_store, config);
let get_range_count = Arc::new(AtomicU64::new(0));
let reader: Arc<dyn Reader> = Arc::new(TrackingReader {
get_range_count: get_range_count.clone(),
path: Path::parse("test").unwrap(),
});
let fut1 = scheduler.submit_request(reader.clone(), vec![0..100], 0);
let fut2 = scheduler.submit_request(reader.clone(), vec![100..200], 10);
let fut3 = scheduler.submit_request(reader.clone(), vec![200..300], 20);
assert_eq!(get_range_count.load(Ordering::Acquire), 3);
assert_eq!(fut1.await.unwrap()[0].len(), 100);
assert_eq!(fut2.await.unwrap()[0].len(), 100);
assert_eq!(fut3.await.unwrap()[0].len(), 100);
}
#[test_log::test(tokio::test(flavor = "multi_thread"))]
async fn stress_backpressure() {
let some_path = Path::parse("foo").unwrap();
let obj_store = Arc::new(ObjectStore::memory());
obj_store
.put(&some_path, vec![0; 100000].as_slice())
.await
.unwrap();
let config = SchedulerConfig {
io_buffer_size_bytes: 1,
use_lite_scheduler: false,
};
let scan_scheduler = ScanScheduler::new(obj_store.clone(), config);
let file_scheduler = scan_scheduler
.open_file(&some_path, &CachedFileSize::unknown())
.await
.unwrap();
let mut futs = Vec::with_capacity(10000);
for idx in 0..10000 {
futs.push(file_scheduler.submit_single(idx..idx + 1, idx));
}
for fut in futs {
fut.await.unwrap();
}
}
}