use bytes::Bytes;
use tracing::{debug, trace};
use crate::client::worker::{WriteBlockHandle, WriteBlockOptions};
use crate::client::WorkerClient;
use crate::error::{Error, Result};
use crate::proto::grpc::block::{write_request, Chunk, WriteRequest, WriteRequestCommand};
pub struct GrpcBlockWriter {
block_id: i64,
bytes_written: i64,
handle: WriteBlockHandle,
}
impl GrpcBlockWriter {
pub async fn open(
worker: &WorkerClient,
block_id: i64,
space_to_reserve: i64,
options: WriteBlockOptions,
) -> Result<Self> {
let handle = worker
.write_block(block_id, space_to_reserve, options)
.await?;
debug!(
block_id = block_id,
space_to_reserve = space_to_reserve,
"opened GrpcBlockWriter"
);
Ok(Self {
block_id,
bytes_written: 0,
handle,
})
}
pub async fn write_chunk(&mut self, data: Bytes) -> Result<()> {
let chunk_len = data.len() as i64;
let req = WriteRequest {
value: Some(write_request::Value::Chunk(Chunk {
data: Some(data.to_vec()),
})),
};
self.handle
.request_tx
.send(req)
.await
.map_err(|_| Error::BlockIoError {
message: format!("write channel closed for block_id={}", self.block_id),
})?;
self.bytes_written += chunk_len;
trace!(
block_id = self.block_id,
chunk_len = chunk_len,
total_written = self.bytes_written,
"wrote chunk"
);
Ok(())
}
pub async fn write_all(&mut self, data: &[u8], chunk_size: usize) -> Result<()> {
let mut offset = 0;
while offset < data.len() {
let end = std::cmp::min(offset + chunk_size, data.len());
let chunk = Bytes::copy_from_slice(&data[offset..end]);
self.write_chunk(chunk).await?;
offset = end;
}
Ok(())
}
pub async fn flush(&mut self) -> Result<i64> {
let flush_req = WriteRequest {
value: Some(write_request::Value::Command(WriteRequestCommand {
flush: Some(true),
..Default::default()
})),
};
self.handle
.request_tx
.send(flush_req)
.await
.map_err(|_| Error::BlockIoError {
message: format!("flush channel closed for block_id={}", self.block_id),
})?;
match self.handle.recv_response().await? {
Some(resp) => {
let offset = resp.offset.unwrap_or(self.bytes_written);
debug!(
block_id = self.block_id,
ack_offset = offset,
"flush acknowledged"
);
Ok(offset)
}
None => Err(Error::BlockIoError {
message: format!(
"stream ended before flush ack for block_id={}",
self.block_id
),
}),
}
}
pub async fn close(self) -> Result<()> {
let block_id = self.block_id;
let bytes_written = self.bytes_written;
self.handle.close().await?;
debug!(
block_id = block_id,
bytes_written = bytes_written,
"closed GrpcBlockWriter"
);
Ok(())
}
pub async fn cancel(self) {
let block_id = self.block_id;
self.handle.cancel().await;
debug!(block_id = block_id, "cancelled GrpcBlockWriter");
}
pub fn block_id(&self) -> i64 {
self.block_id
}
pub fn bytes_written(&self) -> i64 {
self.bytes_written
}
}