#[cfg(feature = "async")]
use crate::usage_err;
use super::LobBuf;
#[cfg(feature = "async")]
use super::fetch::fetch_a_lob_chunk_async;
#[cfg(feature = "sync")]
use super::fetch::fetch_a_lob_chunk_sync;
use crate::{
HdbResult,
base::{OAM, RsCore, XMutexed},
conn::AmConnCore,
impl_err,
protocol::ServerUsage,
};
use debug_ignore::DebugIgnore;
use std::{
io::{Cursor, Write},
sync::Arc,
};
#[derive(Clone, Debug)]
pub(crate) struct BLobHandle {
pub(crate) am_conn_core: AmConnCore,
o_am_rscore: Option<Arc<XMutexed<RsCore>>>,
is_data_complete: bool,
total_byte_length: u64,
locator_id: u64,
data: DebugIgnore<LobBuf>,
acc_byte_length: usize,
pub(crate) server_usage: ServerUsage,
}
impl BLobHandle {
#[allow(clippy::ref_option)]
pub(crate) fn new(
am_conn_core: &AmConnCore,
o_am_rscore: &OAM<RsCore>,
is_data_complete: bool,
total_byte_length: u64,
locator_id: u64,
data: Vec<u8>,
) -> Self {
let data = DebugIgnore::from(LobBuf::with_initial_content(data));
Self {
am_conn_core: am_conn_core.clone(),
o_am_rscore: o_am_rscore.clone(),
total_byte_length,
is_data_complete,
locator_id,
acc_byte_length: data.len(),
data,
server_usage: ServerUsage::default(),
}
}
#[cfg(feature = "sync")]
pub(crate) fn read_slice_sync(&mut self, offset: u64, length: u32) -> HdbResult<Vec<u8>> {
let (reply_data, _reply_is_last_data) = fetch_a_lob_chunk_sync(
&self.am_conn_core,
self.locator_id,
offset,
length,
&mut self.server_usage,
)?;
debug!("read_slice(): got {} bytes", reply_data.len());
Ok(reply_data)
}
#[cfg(feature = "async")]
pub(crate) async fn read_slice_async(
&mut self,
offset: u64,
length: u32,
) -> HdbResult<Vec<u8>> {
let (reply_data, _reply_is_last_data) = fetch_a_lob_chunk_async(
&self.am_conn_core,
self.locator_id,
offset,
length,
&mut self.server_usage,
)
.await?;
debug!("read_slice(): got {} bytes", reply_data.len());
Ok(reply_data)
}
pub(crate) fn total_byte_length(&self) -> u64 {
self.total_byte_length
}
pub(crate) fn cur_buf_len(&self) -> usize {
self.data.len()
}
#[allow(clippy::cast_possible_truncation)]
#[cfg(feature = "sync")]
fn fetch_next_chunk_sync(&mut self) -> HdbResult<usize> {
if self.is_data_complete {
return Err(impl_err!("fetch_next_chunk(): already complete"));
}
let read_length = std::cmp::min(
self.am_conn_core
.lock_sync()?
.configuration()
.lob_read_length(),
(self.total_byte_length - self.acc_byte_length as u64) as u32,
);
let (reply_data, reply_is_last_data) = fetch_a_lob_chunk_sync(
&self.am_conn_core,
self.locator_id,
self.acc_byte_length as u64,
read_length,
&mut self.server_usage,
)?;
let reply_len = reply_data.len();
self.acc_byte_length += reply_len;
self.data.append(&reply_data);
if reply_is_last_data {
self.is_data_complete = true;
self.o_am_rscore = None;
}
assert_eq!(
self.is_data_complete,
self.total_byte_length == self.acc_byte_length as u64
);
trace!(
"fetch_next_chunk: is_data_complete = {}, data.len() = {}",
self.is_data_complete,
self.data.len()
);
Ok(reply_len)
}
#[allow(clippy::cast_possible_truncation)]
#[cfg(feature = "async")]
async fn fetch_next_chunk_async(&mut self) -> HdbResult<()> {
if self.is_data_complete {
return Err(impl_err!("fetch_next_chunk(): already complete"));
}
let read_length = std::cmp::min(
self.am_conn_core
.lock_async()
.await
.configuration()
.lob_read_length(),
(self.total_byte_length - self.acc_byte_length as u64) as u32,
);
let (reply_data, reply_is_last_data) = fetch_a_lob_chunk_async(
&self.am_conn_core,
self.locator_id,
self.acc_byte_length as u64,
read_length,
&mut self.server_usage,
)
.await?;
self.acc_byte_length += reply_data.len();
self.data.append(&reply_data);
if reply_is_last_data {
self.is_data_complete = true;
self.o_am_rscore = None;
}
assert_eq!(
self.is_data_complete,
self.total_byte_length == self.acc_byte_length as u64
);
trace!(
"fetch_next_chunk: is_data_complete = {}, data.len() = {}",
self.is_data_complete,
self.data.len()
);
Ok(())
}
#[cfg(feature = "sync")]
pub(crate) fn load_complete_sync(&mut self) -> HdbResult<()> {
trace!("load_complete()");
while !self.is_data_complete {
self.fetch_next_chunk_sync()?;
}
Ok(())
}
#[cfg(feature = "async")]
pub(crate) async fn load_complete_async(&mut self) -> HdbResult<()> {
trace!("load_complete()");
while !self.is_data_complete {
self.fetch_next_chunk_async().await?;
}
Ok(())
}
#[cfg(feature = "sync")]
pub(crate) fn into_bytes_sync(mut self) -> HdbResult<Vec<u8>> {
trace!("into_bytes()");
self.load_complete_sync()?;
Ok(self.data.0.into_inner())
}
#[cfg(feature = "async")]
pub(crate) fn into_bytes_if_complete_async(self) -> HdbResult<Vec<u8>> {
trace!("into_bytes_if_complete()");
if self.is_data_complete {
Ok(self.data.0.into_inner())
} else {
Err(usage_err!(
"Can't convert BLob that is not not completely loaded"
))
}
}
#[cfg(feature = "async")]
pub(crate) async fn read_async(&mut self, buf: &mut [u8]) -> HdbResult<usize> {
let buf_len = buf.len();
trace!("BLobHandle::read() with buffer of size {buf_len}");
let mut cursor = Cursor::new(buf);
let mut written = 0;
while written < buf_len {
if self.data.is_empty() {
if !self.is_data_complete {
self.fetch_next_chunk_async()
.await
.map_err(std::io::Error::other)?;
}
if self.data.is_empty() {
break;
}
}
let chunk_size = std::cmp::min(buf_len - written, self.data.len());
cursor.write_all(self.data.drain(chunk_size)?)?;
written += chunk_size;
}
Ok(written)
}
}
#[cfg(feature = "sync")]
impl std::io::Read for BLobHandle {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
let buf_len = buf.len();
trace!("BLobHandle::read() with buffer of size {buf_len}");
let mut cursor = Cursor::new(buf);
let mut written = 0;
while written < buf_len {
if self.data.is_empty() {
if !self.is_data_complete {
self.fetch_next_chunk_sync()
.map_err(std::io::Error::other)?;
}
if self.data.is_empty() {
break;
}
}
let chunk_size = std::cmp::min(buf_len - written, self.data.len());
cursor.write_all(self.data.drain(chunk_size)?)?;
written += chunk_size;
}
Ok(written)
}
}