use bytes::{Bytes, BytesMut};
use tokio::sync::mpsc;
use tonic::Streaming;
use tracing::{debug, trace, warn};
use crate::client::WorkerClient;
use crate::error::Result;
use crate::proto::grpc::block::{ReadRequest, ReadResponse};
use crate::proto::proto::dataserver::OpenUfsBlockOptions;
pub struct GrpcBlockReader {
block_id: i64,
offset: i64,
length: i64,
bytes_received: i64,
request_tx: mpsc::Sender<ReadRequest>,
response_rx: Streaming<ReadResponse>,
}
impl GrpcBlockReader {
pub async fn open(
worker: &WorkerClient,
block_id: i64,
offset: i64,
length: i64,
chunk_size: i64,
open_ufs_block_options: Option<OpenUfsBlockOptions>,
) -> Result<Self> {
let (request_tx, response_rx) = worker
.read_block(block_id, offset, length, chunk_size, open_ufs_block_options)
.await?;
debug!(
block_id = block_id,
offset = offset,
length = length,
"opened GrpcBlockReader"
);
Ok(Self {
block_id,
offset,
length,
bytes_received: 0,
request_tx,
response_rx,
})
}
pub async fn read_chunk(&mut self) -> Result<Option<Bytes>> {
if self.bytes_received >= self.length {
return Ok(None);
}
match self.response_rx.message().await? {
Some(resp) => {
let data = resp.chunk.and_then(|c| c.data).unwrap_or_default();
if data.is_empty() {
return Ok(None);
}
self.bytes_received += data.len() as i64;
trace!(
block_id = self.block_id,
chunk_len = data.len(),
total_received = self.bytes_received,
"received chunk"
);
let ack = ReadRequest {
offset_received: Some(self.offset + self.bytes_received),
..Default::default()
};
if self.request_tx.send(ack).await.is_err() {
warn!(
block_id = self.block_id,
"ACK channel closed (read may be complete)"
);
}
Ok(Some(Bytes::from(data)))
}
None => {
Ok(None)
}
}
}
pub async fn read_all(&mut self) -> Result<Bytes> {
let mut buf = BytesMut::with_capacity(self.length as usize);
while let Some(chunk) = self.read_chunk().await? {
buf.extend_from_slice(&chunk);
}
Ok(buf.freeze())
}
pub fn block_id(&self) -> i64 {
self.block_id
}
pub fn bytes_received(&self) -> i64 {
self.bytes_received
}
pub fn is_complete(&self) -> bool {
self.bytes_received >= self.length
}
pub async fn positioned_read(
worker: &WorkerClient,
block_id: i64,
offset: i64,
length: i64,
chunk_size: i64,
open_ufs_block_options: Option<OpenUfsBlockOptions>,
) -> Result<Bytes> {
let (request_tx, response_rx) = worker
.read_block_positioned(block_id, offset, length, chunk_size, open_ufs_block_options)
.await?;
debug!(
block_id = block_id,
offset = offset,
length = length,
"positioned_read: opened position_short stream"
);
let mut reader = Self {
block_id,
offset,
length,
bytes_received: 0,
request_tx,
response_rx,
};
reader.read_all().await
}
}