use std::sync::Arc;
use futures::FutureExt;
use futures::future::BoxFuture;
use vortex_array::buffer::BufferHandle;
use vortex_buffer::Alignment;
use vortex_buffer::ByteBuffer;
use vortex_error::VortexExpect;
use vortex_error::VortexResult;
use vortex_error::vortex_bail;
use vortex_metrics::Counter;
use vortex_metrics::Histogram;
use vortex_metrics::Label;
use vortex_metrics::MetricBuilder;
use vortex_metrics::MetricsRegistry;
use vortex_metrics::Timer;
#[derive(Clone, Copy, Debug)]
pub struct CoalesceConfig {
pub distance: u64,
pub max_size: u64,
}
impl CoalesceConfig {
pub const fn new(distance: u64, max_size: u64) -> Self {
Self { distance, max_size }
}
pub const fn in_memory() -> Self {
Self::new(8 * 1024, 8 * 1024) }
pub const fn file() -> Self {
Self::new(1 << 20, 4 << 20) }
pub const fn object_storage() -> Self {
Self::new(1 << 20, 16 << 20) }
}
pub trait VortexReadAt: Send + Sync + 'static {
fn uri(&self) -> Option<&Arc<str>> {
None
}
fn coalesce_config(&self) -> Option<CoalesceConfig> {
None
}
fn concurrency(&self) -> usize;
fn size(&self) -> BoxFuture<'static, VortexResult<u64>>;
fn read_at(
&self,
offset: u64,
length: usize,
alignment: Alignment,
) -> BoxFuture<'static, VortexResult<BufferHandle>>;
}
impl VortexReadAt for Arc<dyn VortexReadAt> {
fn uri(&self) -> Option<&Arc<str>> {
self.as_ref().uri()
}
fn coalesce_config(&self) -> Option<CoalesceConfig> {
self.as_ref().coalesce_config()
}
fn concurrency(&self) -> usize {
self.as_ref().concurrency()
}
fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
self.as_ref().size()
}
fn read_at(
&self,
offset: u64,
length: usize,
alignment: Alignment,
) -> BoxFuture<'static, VortexResult<BufferHandle>> {
self.as_ref().read_at(offset, length, alignment)
}
}
impl<R: VortexReadAt> VortexReadAt for Arc<R> {
fn uri(&self) -> Option<&Arc<str>> {
self.as_ref().uri()
}
fn coalesce_config(&self) -> Option<CoalesceConfig> {
self.as_ref().coalesce_config()
}
fn concurrency(&self) -> usize {
self.as_ref().concurrency()
}
fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
self.as_ref().size()
}
fn read_at(
&self,
offset: u64,
length: usize,
alignment: Alignment,
) -> BoxFuture<'static, VortexResult<BufferHandle>> {
self.as_ref().read_at(offset, length, alignment)
}
}
impl VortexReadAt for ByteBuffer {
fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
let length = self.len() as u64;
async move { Ok(length) }.boxed()
}
fn concurrency(&self) -> usize {
16
}
fn read_at(
&self,
offset: u64,
length: usize,
alignment: Alignment,
) -> BoxFuture<'static, VortexResult<BufferHandle>> {
let buffer = self.clone();
async move {
let start = usize::try_from(offset).vortex_expect("start too big for usize");
let end =
usize::try_from(offset + length as u64).vortex_expect("end too big for usize");
if end > buffer.len() {
vortex_bail!(
"Requested range {}..{} out of bounds for buffer of length {}",
start,
end,
buffer.len()
);
}
Ok(BufferHandle::new_host(
buffer.slice_unaligned(start..end).aligned(alignment),
))
}
.boxed()
}
}
#[derive(Clone)]
pub struct InstrumentedReadAt<T: VortexReadAt + Clone> {
read: T,
metrics: Arc<InnerMetrics>,
}
struct InnerMetrics {
sizes: Histogram,
total_size: Counter,
durations: Timer,
}
impl<T: VortexReadAt + Clone> InstrumentedReadAt<T> {
pub fn new(read: T, metrics_registry: &dyn MetricsRegistry) -> Self {
Self::new_with_labels(read, metrics_registry, Vec::<Label>::default())
}
pub fn new_with_labels<I, L>(read: T, metrics_registry: &dyn MetricsRegistry, labels: I) -> Self
where
I: IntoIterator<Item = L>,
L: Into<Label>,
{
let labels = labels.into_iter().map(|l| l.into()).collect::<Vec<Label>>();
let sizes = MetricBuilder::new(metrics_registry)
.add_labels(labels.clone())
.histogram("vortex.io.read.size");
let total_size = MetricBuilder::new(metrics_registry)
.add_labels(labels.clone())
.counter("vortex.io.read.total_size");
let durations = MetricBuilder::new(metrics_registry)
.add_labels(labels)
.timer("vortex.io.read.duration");
Self {
read,
metrics: Arc::new(InnerMetrics {
sizes,
total_size,
durations,
}),
}
}
}
impl Drop for InnerMetrics {
#[allow(clippy::cognitive_complexity)]
fn drop(&mut self) {
tracing::debug!("Reads: {}", self.sizes.count());
if !self.sizes.is_empty() {
tracing::debug!(
"Read size: p50={} p95={} p99={} p999={}",
self.sizes.quantile(0.5).vortex_expect("must not be empty"),
self.sizes.quantile(0.95).vortex_expect("must not be empty"),
self.sizes.quantile(0.99).vortex_expect("must not be empty"),
self.sizes
.quantile(0.999)
.vortex_expect("must not be empty"),
);
}
let total_size = self.total_size.value();
tracing::debug!("Total read size: {total_size}");
if !self.durations.is_empty() {
tracing::debug!(
"Read duration: p50={}ms p95={}ms p99={}ms p999={}ms",
self.durations
.quantile(0.5)
.vortex_expect("must not be empty")
.as_millis(),
self.durations
.quantile(0.95)
.vortex_expect("must not be empty")
.as_millis(),
self.durations
.quantile(0.99)
.vortex_expect("must not be empty")
.as_millis(),
self.durations
.quantile(0.999)
.vortex_expect("must not be empty")
.as_millis(),
);
}
}
}
impl<T: VortexReadAt + Clone> VortexReadAt for InstrumentedReadAt<T> {
fn uri(&self) -> Option<&Arc<str>> {
self.read.uri()
}
fn coalesce_config(&self) -> Option<CoalesceConfig> {
self.read.coalesce_config()
}
fn concurrency(&self) -> usize {
self.read.concurrency()
}
fn size(&self) -> BoxFuture<'static, VortexResult<u64>> {
self.read.size()
}
fn read_at(
&self,
offset: u64,
length: usize,
alignment: Alignment,
) -> BoxFuture<'static, VortexResult<BufferHandle>> {
let durations = self.metrics.durations.clone();
let sizes = self.metrics.sizes.clone();
let total_size = self.metrics.total_size.clone();
let read_fut = self.read.read_at(offset, length, alignment);
async move {
let _timer = durations.time();
let buf = read_fut.await;
sizes.update(length as f64);
total_size.add(length as u64);
buf
}
.boxed()
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use vortex_buffer::Alignment;
use vortex_buffer::ByteBuffer;
use super::*;
#[test]
fn test_coalesce_config_in_memory() {
let config = CoalesceConfig::in_memory();
assert_eq!(config.distance, 8 * 1024);
assert_eq!(config.max_size, 8 * 1024);
}
#[test]
fn test_coalesce_config_file() {
let config = CoalesceConfig::file();
assert_eq!(config.distance, 1 << 20); assert_eq!(config.max_size, 4 << 20); }
#[test]
fn test_coalesce_config_object_storage() {
let config = CoalesceConfig::object_storage();
assert_eq!(config.distance, 1 << 20); assert_eq!(config.max_size, 16 << 20); }
#[tokio::test]
async fn test_byte_buffer_read_at() {
let data = ByteBuffer::from(vec![1, 2, 3, 4, 5]);
let result = data.read_at(1, 3, Alignment::none()).await.unwrap();
assert_eq!(result.to_host().await.as_ref(), &[2, 3, 4]);
}
#[tokio::test]
async fn test_byte_buffer_read_out_of_bounds() {
let data = ByteBuffer::from(vec![1, 2, 3]);
let result = data.read_at(1, 9, Alignment::none()).await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_arc_read_at() {
let data = Arc::new(ByteBuffer::from(vec![1, 2, 3, 4, 5]));
let result = data.read_at(2, 3, Alignment::none()).await.unwrap();
assert_eq!(result.to_host().await.as_ref(), &[3, 4, 5]);
let size = data.size().await.unwrap();
assert_eq!(size, 5);
}
}