use std::sync::Arc;
use async_trait::async_trait;
use futures::FutureExt;
use futures::future::BoxFuture;
use vortex_buffer::{Alignment, ByteBuffer};
use vortex_error::{VortexExpect, VortexResult, vortex_bail};
use vortex_metrics::{Histogram, Timer, VortexMetrics};
pub trait VortexReadAt: Send + Sync + 'static {
fn read_at(
&self,
offset: u64,
length: usize,
alignment: Alignment,
) -> BoxFuture<'static, VortexResult<ByteBuffer>>;
fn size(&self) -> BoxFuture<'static, VortexResult<u64>>;
fn performance_hint(&self) -> PerformanceHint {
PerformanceHint::local()
}
}
#[derive(Debug, Clone)]
pub struct PerformanceHint {
coalescing_window: u64,
max_read: Option<u64>,
}
impl PerformanceHint {
pub fn new(coalescing_window: u64, max_read: Option<u64>) -> Self {
Self {
coalescing_window,
max_read,
}
}
pub fn local() -> Self {
Self::new(8192, Some(8192))
}
pub fn object_storage() -> Self {
Self::new(
1 << 20, Some(8 << 20), )
}
pub fn coalescing_window(&self) -> u64 {
self.coalescing_window
}
pub fn max_read(&self) -> Option<u64> {
self.max_read
}
}
impl<R: VortexReadAt> VortexReadAt for Arc<R> {
fn read_at(
&self,
offset: u64,
length: usize,
alignment: Alignment,
) -> BoxFuture<'static, VortexResult<ByteBuffer>> {
self.as_ref().read_at(offset, length, alignment)
}
fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
self.as_ref().size()
}
fn performance_hint(&self) -> PerformanceHint {
self.as_ref().performance_hint()
}
}
impl VortexReadAt for ByteBuffer {
fn read_at(
&self,
offset: u64,
length: usize,
alignment: Alignment,
) -> BoxFuture<'static, VortexResult<ByteBuffer>> {
let buffer = self.clone();
async move {
let start = usize::try_from(offset).vortex_expect("start too big for usize");
let end =
usize::try_from(offset + length as u64).vortex_expect("end too big for usize");
if end > buffer.len() {
vortex_bail!(
"Requested range {}..{} out of bounds for buffer of length {}",
start,
end,
buffer.len()
);
}
Ok(buffer.slice_unaligned(start..end).aligned(alignment))
}
.boxed()
}
fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
let length = self.len() as u64;
async move { Ok(length) }.boxed()
}
fn performance_hint(&self) -> PerformanceHint {
PerformanceHint::local()
}
}
#[derive(Clone)]
pub struct InstrumentedReadAt<T: VortexReadAt> {
read: Arc<T>,
sizes: Arc<Histogram>,
durations: Arc<Timer>,
}
impl<T: VortexReadAt> InstrumentedReadAt<T> {
pub fn new(read: Arc<T>, metrics: &VortexMetrics) -> Self {
Self {
read,
sizes: metrics.histogram("vortex.io.read.size"),
durations: metrics.timer("vortex.io.read.duration"),
}
}
}
impl<T> Drop for InstrumentedReadAt<T>
where
T: VortexReadAt,
{
fn drop(&mut self) {
let sizes = self.sizes.snapshot();
log::debug!("Reads: {}", self.sizes.count());
log::debug!(
"Read size: p50={} p95={} p99={} p999={}",
sizes.value(0.5),
sizes.value(0.95),
sizes.value(0.99),
sizes.value(0.999),
);
let durations = self.durations.snapshot();
log::debug!(
"Read duration: p50={}ms p95={}ms p99={}ms p999={}ms",
durations.value(0.5) / 1_000_000.0,
durations.value(0.95) / 1_000_000.0,
durations.value(0.99) / 1_000_000.0,
durations.value(0.999) / 1_000_000.0
);
}
}
#[async_trait]
impl<T: VortexReadAt> VortexReadAt for InstrumentedReadAt<T> {
fn read_at(
&self,
offset: u64,
length: usize,
alignment: Alignment,
) -> BoxFuture<'static, VortexResult<ByteBuffer>> {
let durations = self.durations.clone();
let sizes = self.sizes.clone();
let read_fut = self.read.read_at(offset, length, alignment);
async move {
let _timer = durations.time();
let buf = read_fut.await;
sizes.update(length as i64);
buf
}
.boxed()
}
#[inline]
fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
self.read.size()
}
fn performance_hint(&self) -> PerformanceHint {
self.read.performance_hint()
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use vortex_buffer::{Alignment, ByteBuffer};
use super::*;
#[test]
fn test_performance_hint_local() {
let hint = PerformanceHint::local();
assert_eq!(hint.coalescing_window(), 8192);
assert_eq!(hint.max_read(), Some(8192));
}
#[test]
fn test_performance_hint_object_storage() {
let hint = PerformanceHint::object_storage();
assert_eq!(hint.coalescing_window(), 1 << 20); assert_eq!(hint.max_read(), Some(8 << 20)); }
#[test]
fn test_performance_hint_custom() {
let hint = PerformanceHint::new(4096, Some(16384));
assert_eq!(hint.coalescing_window(), 4096);
assert_eq!(hint.max_read(), Some(16384));
}
#[test]
fn test_performance_hint_no_max() {
let hint = PerformanceHint::new(2048, None);
assert_eq!(hint.coalescing_window(), 2048);
assert_eq!(hint.max_read(), None);
}
#[tokio::test]
async fn test_byte_buffer_read_at() {
let data = ByteBuffer::from(vec![1, 2, 3, 4, 5]);
let result = data.read_at(1, 3, Alignment::none()).await.unwrap();
assert_eq!(result.as_ref(), &[2, 3, 4]);
}
#[tokio::test]
async fn test_byte_buffer_read_out_of_bounds() {
let data = ByteBuffer::from(vec![1, 2, 3]);
let result = data.read_at(1, 9, Alignment::none()).await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_arc_read_at() {
let data = Arc::new(ByteBuffer::from(vec![1, 2, 3, 4, 5]));
let result = data.read_at(2, 3, Alignment::none()).await.unwrap();
assert_eq!(result.as_ref(), &[3, 4, 5]);
let size = data.size().await.unwrap();
assert_eq!(size, 5);
}
}