use futures::future::RemoteHandle;
use mountpoint_s3_client::ObjectClient;
use super::PrefetchReadError;
use super::backpressure_controller::BackpressureController;
use super::backpressure_controller::BackpressureFeedbackEvent::{DataRead, PartQueueStall};
use super::part::Part;
use super::part_queue::PartQueue;
use super::part_stream::RequestRange;
#[derive(Debug)]
pub struct RequestTask<Client: ObjectClient> {
_task_handle: RemoteHandle<()>,
remaining: usize,
range: RequestRange,
part_queue: PartQueue<Client>,
backpressure_controller: BackpressureController,
}
impl<Client: ObjectClient> RequestTask<Client> {
pub fn from_handle(
task_handle: RemoteHandle<()>,
range: RequestRange,
part_queue: PartQueue<Client>,
backpressure_controller: BackpressureController,
) -> Self {
Self {
_task_handle: task_handle,
remaining: range.len(),
range,
part_queue,
backpressure_controller,
}
}
pub async fn push_front(&mut self, parts: Vec<Part>) -> Result<(), PrefetchReadError<Client::ClientError>> {
for part in parts.into_iter().rev() {
self.remaining += part.len();
self.part_queue.push_front(part).await?;
}
Ok(())
}
pub async fn read(&mut self, length: usize) -> Result<Part, PrefetchReadError<Client::ClientError>> {
let part = self.part_queue.read(length).await?;
debug_assert!(part.len() <= self.remaining);
self.remaining -= part.len();
self.backpressure_controller
.send_feedback(DataRead {
offset: part.offset(),
length: part.len(),
})
.await?;
let next_offset = part.offset() + part.len() as u64;
let remaining_in_queue = self.available_offset().saturating_sub(next_offset) as usize;
if remaining_in_queue == 0 {
self.backpressure_controller.send_feedback(PartQueueStall).await?;
}
Ok(part)
}
pub fn start_offset(&self) -> u64 {
self.range.start()
}
pub fn end_offset(&self) -> u64 {
self.range.end()
}
pub fn total_size(&self) -> usize {
self.range.len()
}
pub fn remaining(&self) -> usize {
self.remaining
}
pub fn available_offset(&self) -> u64 {
self.start_offset() + self.part_queue.bytes_received() as u64
}
pub fn read_window_end_offset(&self) -> u64 {
self.backpressure_controller.read_window_end_offset()
}
}