use std::collections::{BTreeMap, BTreeSet};
use std::pin::Pin;
use std::task::{Context, Poll};
use futures::Stream;
use pin_project_lite::pin_project;
use vortex_error::VortexExpect;
use vortex_metrics::VortexMetrics;
use crate::file::read::{
CoalesceWindow, CoalescedRequest, IoRequest, ReadEvent, ReadRequest, RequestId,
};
pin_project! {
pub(crate) struct IoRequestStream<S> {
#[pin]
events: S,
inner_done: bool,
coalesce_window: Option<CoalesceWindow>,
state: State,
}
}
impl<S> IoRequestStream<S> {
pub(crate) fn new(
events: S,
coalesce_window: Option<CoalesceWindow>,
metrics: VortexMetrics,
) -> Self
where
S: Stream<Item = ReadEvent> + Unpin + Send + 'static,
{
IoRequestStream {
events,
inner_done: false,
coalesce_window,
state: State::new(metrics),
}
}
}
impl<S> Stream for IoRequestStream<S>
where
S: Stream<Item = ReadEvent> + Unpin + Send + 'static,
{
type Item = IoRequest;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
loop {
match this.events.as_mut().poll_next(cx) {
Poll::Ready(Some(event)) => {
this.state.on_event(event);
}
Poll::Ready(None) => {
*this.inner_done = true;
break;
}
Poll::Pending => {
break;
}
}
}
if let Some(coalesced) = this.state.next(this.coalesce_window.as_ref()) {
return Poll::Ready(Some(coalesced));
}
if *this.inner_done && this.state.polled_requests.is_empty() {
return Poll::Ready(None);
}
Poll::Pending
}
}
struct State {
requests: BTreeMap<RequestId, ReadRequest>,
polled_requests: BTreeMap<RequestId, ReadRequest>,
requests_by_offset: BTreeSet<(u64, RequestId)>,
metrics: VortexMetrics,
}
impl State {
fn new(metrics: VortexMetrics) -> Self {
Self {
requests: BTreeMap::new(),
polled_requests: BTreeMap::new(),
requests_by_offset: BTreeSet::new(),
metrics,
}
}
fn on_event(&mut self, event: ReadEvent) {
log::debug!("Received ReadEvent: {:?}", event);
match event {
ReadEvent::Request(req) => {
self.requests_by_offset.insert((req.offset, req.id));
self.requests.insert(req.id, req);
}
ReadEvent::Polled(req_id) => {
if let Some(req) = self.requests.remove(&req_id) {
self.polled_requests.insert(req_id, req);
}
}
ReadEvent::Dropped(req_id) => {
if let Some(req) = self.requests.remove(&req_id) {
self.requests_by_offset.remove(&(req.offset, req_id));
log::debug!("ReadRequest dropped before poll: {:?}", req);
}
if let Some(req) = self.polled_requests.remove(&req_id) {
self.requests_by_offset.remove(&(req.offset, req_id));
log::debug!("ReadRequest dropped after poll: {:?}", req);
}
}
}
}
fn next(&mut self, coalesce_window: Option<&CoalesceWindow>) -> Option<IoRequest> {
match coalesce_window {
None => self.next_uncoalesced().map(|request| {
self.metrics.counter("io.requests.individual").inc();
IoRequest::new_single(request)
}),
Some(window) => self.next_coalesced(window).map(|request| {
match request.requests.len() {
1 => self.metrics.counter("io.requests.individual").inc(),
num_requests => {
self.metrics.counter("io.requests.coalesced").inc();
self.metrics
.histogram("io.requests.coalesced.num_coalesced")
.update(num_requests as i64);
}
};
IoRequest::new_coalesced(request)
}),
}
}
fn next_uncoalesced(&mut self) -> Option<ReadRequest> {
while let Some((req_id, req)) = self.polled_requests.pop_first() {
self.requests_by_offset.remove(&(req.offset, req_id));
if req.callback.is_closed() {
log::debug!("Dropping canceled request");
continue;
}
return Some(req);
}
None
}
fn next_coalesced(&mut self, window: &CoalesceWindow) -> Option<CoalescedRequest> {
let first_req = self.next_uncoalesced()?;
let mut requests = vec![first_req];
let mut current_start = requests[0].offset;
let mut current_end = requests[0].offset + requests[0].length as u64;
let alignment = requests[0].alignment;
let mut keys_to_remove = Vec::new();
let mut found_new_requests = true;
while found_new_requests {
found_new_requests = false;
let scan_start = current_start.saturating_sub(window.distance);
let scan_end = current_end.saturating_add(window.distance);
for &(req_offset, req_id) in self
.requests_by_offset
.range((scan_start, RequestId::MIN)..=(scan_end, RequestId::MAX))
{
if keys_to_remove.iter().any(|&(_, id)| id == req_id) {
continue;
}
let req = self
.polled_requests
.get(&req_id)
.or_else(|| self.requests.get(&req_id))
.vortex_expect("Missing request in requests_by_offset");
if req.callback.is_closed() {
keys_to_remove.push((req_offset, req_id));
continue;
}
let req_end = req_offset + req.length as u64;
if (req_offset <= current_end + window.distance && req_end >= current_start)
|| (req_end + window.distance >= current_start && req_offset <= current_end)
{
let new_start = current_start.min(req_offset);
let new_end = current_end.max(req_end);
let new_total_size = new_end - new_start;
if new_total_size <= window.max_size {
current_start = new_start;
current_end = new_end;
let req = self
.polled_requests
.remove(&req_id)
.or_else(|| self.requests.remove(&req_id))
.vortex_expect("Missing request in requests_by_offset");
requests.push(req);
keys_to_remove.push((req_offset, req_id));
found_new_requests = true;
}
}
}
}
for (req_offset, req_id) in keys_to_remove {
self.requests_by_offset.remove(&(req_offset, req_id));
self.polled_requests
.remove(&req_id)
.or_else(|| self.requests.remove(&req_id));
}
requests.sort_unstable_by_key(|r| r.offset);
log::debug!(
"Coalesced {} requests into range {}..{} (len={})",
requests.len(),
current_start,
current_end,
current_end - current_start,
);
Some(CoalescedRequest {
range: current_start..current_end,
alignment,
requests,
})
}
}
#[cfg(test)]
mod tests {
use futures::{StreamExt, stream};
use vortex_buffer::{Alignment, ByteBuffer};
use vortex_error::VortexResult;
use super::*;
use crate::file::{IoRequestInner, ReadEvent, ReadRequest};
fn create_request(
id: usize,
offset: u64,
length: usize,
) -> (ReadRequest, oneshot::Receiver<VortexResult<ByteBuffer>>) {
let (tx, rx) = oneshot::channel();
(
ReadRequest {
id,
offset,
length,
alignment: Alignment::none(),
callback: tx,
},
rx,
)
}
async fn collect_outputs(
events: Vec<ReadEvent>,
coalesce_window: Option<CoalesceWindow>,
) -> Vec<IoRequest> {
let event_stream = stream::iter(events);
let metrics = VortexMetrics::default();
let io_stream = IoRequestStream::new(event_stream, coalesce_window, metrics);
io_stream.collect().await
}
#[tokio::test]
async fn test_single_request() {
let (req, _rx) = create_request(1, 100, 50);
let events = vec![ReadEvent::Request(req), ReadEvent::Polled(1)];
let outputs = collect_outputs(events, None).await;
assert_eq!(outputs.len(), 1);
assert_eq!(outputs[0].offset(), 100);
assert_eq!(outputs[0].len(), 50);
}
#[tokio::test]
async fn test_poll_order_priority() {
let (req1, _rx1) = create_request(1, 0, 10);
let (req2, _rx2) = create_request(2, 100, 10);
let (req3, _rx3) = create_request(3, 200, 10);
let events = vec![
ReadEvent::Request(req2),
ReadEvent::Request(req1),
ReadEvent::Request(req3),
ReadEvent::Polled(1),
ReadEvent::Polled(2),
ReadEvent::Polled(3),
];
let outputs = collect_outputs(events, None).await;
assert_eq!(outputs.len(), 3);
let offsets: Vec<u64> = outputs.iter().map(|req| req.offset()).collect();
assert_eq!(offsets, vec![0, 100, 200]); }
#[tokio::test]
async fn test_coalesce_adjacent() {
let (req1, _rx1) = create_request(1, 0, 10);
let (req2, _rx2) = create_request(2, 10, 10);
let (req3, _rx3) = create_request(3, 20, 10);
let events = vec![
ReadEvent::Request(req1),
ReadEvent::Request(req2),
ReadEvent::Request(req3),
ReadEvent::Polled(1),
ReadEvent::Polled(2),
ReadEvent::Polled(3),
];
let outputs = collect_outputs(
events,
Some(CoalesceWindow {
distance: 0,
max_size: 1024,
}),
)
.await;
assert_eq!(outputs.len(), 1);
match outputs[0].inner() {
IoRequestInner::Coalesced(coalesced) => {
assert_eq!(coalesced.range, 0..30);
assert_eq!(coalesced.requests.len(), 3);
}
_ => panic!("Expected coalesced request"),
}
}
#[tokio::test]
async fn test_coalesce_with_gap() {
let (req1, _rx1) = create_request(1, 0, 10);
let (req2, _rx2) = create_request(2, 15, 10);
let events = vec![
ReadEvent::Request(req1),
ReadEvent::Request(req2),
ReadEvent::Polled(1),
];
let outputs = collect_outputs(
events,
Some(CoalesceWindow {
distance: 6,
max_size: 1024,
}),
)
.await;
assert_eq!(outputs.len(), 1);
match outputs[0].inner() {
IoRequestInner::Coalesced(c) => assert_eq!(c.requests.len(), 2),
_ => panic!("Expected coalesced"),
}
}
#[tokio::test]
async fn test_dropped_requests() {
let (req1, _rx1) = create_request(1, 0, 10);
let (req2, _rx2) = create_request(2, 100, 10);
let (req3, _rx3) = create_request(3, 200, 10);
let events = vec![
ReadEvent::Request(req1),
ReadEvent::Request(req2),
ReadEvent::Request(req3),
ReadEvent::Dropped(1), ReadEvent::Polled(2),
ReadEvent::Polled(3),
ReadEvent::Dropped(3), ];
let outputs = collect_outputs(events, None).await;
assert_eq!(outputs.len(), 1); assert_eq!(outputs[0].offset(), 100);
}
#[tokio::test]
async fn test_cancelled_requests() {
let (tx1, rx1) = oneshot::channel();
let (tx2, _rx2) = oneshot::channel();
drop(rx1);
let req1 = ReadRequest {
id: 1,
offset: 0,
length: 10,
alignment: Alignment::none(),
callback: tx1,
};
let req2 = ReadRequest {
id: 2,
offset: 100,
length: 10,
alignment: Alignment::none(),
callback: tx2,
};
let events = vec![
ReadEvent::Request(req1),
ReadEvent::Request(req2),
ReadEvent::Polled(1),
ReadEvent::Polled(2),
];
let outputs = collect_outputs(events, None).await;
assert_eq!(outputs.len(), 1); assert_eq!(outputs[0].offset(), 100);
}
#[tokio::test]
async fn test_unpolled_requests_ignored() {
let (req1, _rx1) = create_request(1, 0, 10);
let (req2, _rx2) = create_request(2, 100, 10);
let events = vec![
ReadEvent::Request(req1),
ReadEvent::Request(req2),
];
let outputs = collect_outputs(events, None).await;
assert_eq!(outputs.len(), 0);
}
#[tokio::test]
async fn test_coalesce_expansion_around_polled() {
let (req1, _rx1) = create_request(1, 0, 10);
let (req2, _rx2) = create_request(2, 50, 10); let (req3, _rx3) = create_request(3, 100, 10);
let events = vec![
ReadEvent::Request(req1),
ReadEvent::Request(req2),
ReadEvent::Request(req3),
ReadEvent::Polled(2), ];
let outputs = collect_outputs(
events,
Some(CoalesceWindow {
distance: 60,
max_size: 1024,
}),
)
.await;
assert_eq!(outputs.len(), 1);
match outputs[0].inner() {
IoRequestInner::Coalesced(coalesced) => {
assert_eq!(coalesced.range, 0..110);
assert_eq!(coalesced.requests.len(), 3);
assert_eq!(coalesced.requests[0].offset, 0);
assert_eq!(coalesced.requests[1].offset, 50);
assert_eq!(coalesced.requests[2].offset, 100);
}
_ => panic!("Expected coalesced request"),
}
}
#[tokio::test]
async fn test_empty_stream() {
let outputs = collect_outputs(vec![], None).await;
assert_eq!(outputs.len(), 0);
}
#[tokio::test]
async fn test_mixed_coalesced_and_single() {
let (req1, _rx1) = create_request(1, 0, 10);
let (req2, _rx2) = create_request(2, 10, 10);
let (req3, _rx3) = create_request(3, 1000, 10);
let events = vec![
ReadEvent::Request(req1),
ReadEvent::Request(req2),
ReadEvent::Polled(1),
ReadEvent::Request(req3),
ReadEvent::Polled(3),
];
let outputs = collect_outputs(
events,
Some(CoalesceWindow {
distance: 5,
max_size: 1024,
}),
)
.await;
assert_eq!(outputs.len(), 2);
assert_eq!(outputs[0].range(), 0..20);
assert_eq!(outputs[1].range(), 1000..1010);
}
#[tokio::test]
async fn test_metrics_tracking() {
let (req1, _rx1) = create_request(1, 0, 10);
let (req2, _rx2) = create_request(2, 10, 10);
let (req3, _rx3) = create_request(3, 1000, 10);
let events = vec![
ReadEvent::Request(req1),
ReadEvent::Request(req2),
ReadEvent::Polled(1),
ReadEvent::Polled(2),
ReadEvent::Request(req3),
ReadEvent::Polled(3),
];
let event_stream = stream::iter(events);
let metrics = VortexMetrics::default();
let io_stream = IoRequestStream::new(
event_stream,
Some(CoalesceWindow {
distance: 5,
max_size: 1024,
}),
metrics.clone(),
);
let outputs: Vec<IoRequest> = io_stream.collect().await;
assert_eq!(outputs.len(), 2);
let snapshot = metrics.snapshot();
let mut individual_count = 0i64;
let mut coalesced_operations = 0i64;
let mut coalesced_histogram_count = 0u64;
for (metric_id, metric) in snapshot.iter() {
match metric {
vortex_metrics::Metric::Counter(counter) => {
if metric_id.name() == "io.requests.individual" {
individual_count = counter.count();
} else if metric_id.name() == "io.requests.coalesced" {
coalesced_operations = counter.count();
}
}
vortex_metrics::Metric::Histogram(histogram) => {
if metric_id.name() == "io.requests.coalesced.num_coalesced" {
coalesced_histogram_count = histogram.count();
}
}
_ => {}
}
}
assert_eq!(individual_count, 1, "Expected 1 individual request");
assert_eq!(coalesced_operations, 1, "Expected 1 coalesced operation");
assert_eq!(
coalesced_histogram_count, 1,
"Expected 1 histogram entry for coalesced count"
);
}
#[tokio::test]
async fn test_metrics_individual_only() {
let (req1, _rx1) = create_request(1, 0, 10);
let (req2, _rx2) = create_request(2, 100, 10);
let events = vec![
ReadEvent::Request(req1),
ReadEvent::Request(req2),
ReadEvent::Polled(1),
ReadEvent::Polled(2),
];
let event_stream = stream::iter(events);
let metrics = VortexMetrics::default();
let io_stream = IoRequestStream::new(event_stream, None, metrics.clone());
let outputs: Vec<IoRequest> = io_stream.collect().await;
assert_eq!(outputs.len(), 2);
let snapshot = metrics.snapshot();
let mut individual_count = 0i64;
let mut coalesced_operations = 0i64;
for (metric_id, metric) in snapshot.iter() {
if let vortex_metrics::Metric::Counter(counter) = metric {
if metric_id.name() == "io.requests.individual" {
individual_count = counter.count();
} else if metric_id.name() == "io.requests.coalesced.num_coalesced" {
coalesced_operations = counter.count();
}
}
}
assert_eq!(individual_count, 2, "Expected 2 individual requests");
assert_eq!(coalesced_operations, 0, "Expected 0 coalesced operations");
}
}