use super::fetch::fetch_a_lob_chunk;
use super::CharLobSlice;
use crate::conn::AmConnCore;
use crate::protocol::parts::AmRsCore;
use crate::protocol::util;
use crate::{HdbError, HdbResult, ServerUsage};
use std::boxed::Box;
use std::collections::VecDeque;
use std::io::Write;
#[derive(Clone, Debug)]
pub struct NCLob(Box<NCLobHandle>);
impl NCLob {
pub(crate) fn new(
am_conn_core: &AmConnCore,
o_am_rscore: &Option<AmRsCore>,
is_data_complete: bool,
total_char_length: u64,
total_byte_length: u64,
locator_id: u64,
data: Vec<u8>,
) -> HdbResult<Self> {
Ok(Self(Box::new(NCLobHandle::new(
am_conn_core,
o_am_rscore,
is_data_complete,
total_char_length,
total_byte_length,
locator_id,
data,
)?)))
}
pub fn into_string(self) -> HdbResult<String> {
self.0.into_string()
}
pub fn read_slice(&mut self, offset: u64, length: u32) -> HdbResult<CharLobSlice> {
self.0.read_slice(offset, length)
}
pub fn total_byte_length(&self) -> u64 {
self.0.total_byte_length()
}
pub fn total_char_length(&self) -> u64 {
self.0.total_char_length()
}
pub fn is_empty(&self) -> bool {
self.total_byte_length() == 0
}
pub fn max_buf_len(&self) -> usize {
self.0.max_buf_len()
}
pub fn cur_buf_len(&self) -> usize {
self.0.cur_buf_len() as usize
}
pub fn server_usage(&self) -> ServerUsage {
self.0.server_usage
}
}
impl std::io::Read for NCLob {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
self.0.read(buf)
}
}
#[derive(Clone, Debug)]
struct NCLobHandle {
am_conn_core: AmConnCore,
o_am_rscore: Option<AmRsCore>,
is_data_complete: bool,
total_char_length: u64,
total_byte_length: u64,
locator_id: u64,
cesu8: VecDeque<u8>,
cesu8_tail_len: usize,
max_buf_len: usize,
acc_byte_length: usize,
acc_char_length: usize,
server_usage: ServerUsage,
}
impl NCLobHandle {
fn new(
am_conn_core: &AmConnCore,
o_am_rscore: &Option<AmRsCore>,
is_data_complete: bool,
total_char_length: u64,
total_byte_length: u64,
locator_id: u64,
cesu8: Vec<u8>,
) -> HdbResult<Self> {
let acc_char_length = count_1_2_3_sequence_starts(&cesu8);
let cesu8 = VecDeque::from(cesu8);
let acc_byte_length = cesu8.len();
let cesu8_tail_len = util::get_cesu8_tail_len(&cesu8, cesu8.len())?;
let nclob_handle = Self {
am_conn_core: am_conn_core.clone(),
o_am_rscore: match o_am_rscore {
Some(ref am_rscore) => Some(am_rscore.clone()),
None => None,
},
total_char_length,
total_byte_length,
is_data_complete,
locator_id,
max_buf_len: cesu8.len(),
cesu8,
cesu8_tail_len,
acc_byte_length,
acc_char_length,
server_usage: ServerUsage::default(),
};
trace!(
"new() with: is_data_complete = {}, total_char_length = {}, total_byte_length = {}, \
locator_id = {}, cesu8_tail_len = {:?}, cesu8.len() = {}",
nclob_handle.is_data_complete,
nclob_handle.total_char_length,
nclob_handle.total_byte_length,
nclob_handle.locator_id,
nclob_handle.cesu8_tail_len,
nclob_handle.cesu8.len()
);
Ok(nclob_handle)
}
fn read_slice(&mut self, offset: u64, length: u32) -> HdbResult<CharLobSlice> {
let (reply_data, _reply_is_last_data) = fetch_a_lob_chunk(
&mut self.am_conn_core,
self.locator_id,
offset,
length,
&mut self.server_usage,
)?;
debug!("read_slice(): got {} bytes", reply_data.len());
Ok(util::split_off_orphaned_surrogates(reply_data)?)
}
fn total_byte_length(&self) -> u64 {
self.total_byte_length
}
fn total_char_length(&self) -> u64 {
self.total_char_length
}
fn cur_buf_len(&self) -> usize {
self.cesu8.len()
}
#[allow(clippy::cast_possible_truncation)]
fn fetch_next_chunk(&mut self) -> HdbResult<()> {
if self.is_data_complete {
return Err(HdbError::Impl("already complete"));
}
let read_length = std::cmp::min(
self.am_conn_core.lock()?.get_lob_read_length() as u32,
(self.total_char_length - self.acc_char_length as u64) as u32,
);
let (reply_data, reply_is_last_data) = fetch_a_lob_chunk(
&mut self.am_conn_core,
self.locator_id,
self.acc_char_length as u64,
read_length,
&mut self.server_usage,
)?;
self.acc_byte_length += reply_data.len();
self.acc_char_length += count_1_2_3_sequence_starts(&reply_data);
self.cesu8.append(&mut VecDeque::from(reply_data));
self.cesu8_tail_len = util::get_cesu8_tail_len(&self.cesu8, self.cesu8.len())?;
self.is_data_complete = reply_is_last_data;
self.max_buf_len = std::cmp::max(self.cesu8.len(), self.max_buf_len);
assert_eq!(
self.is_data_complete,
self.total_byte_length == self.acc_byte_length as u64
);
trace!(
"fetch_next_chunk: is_data_complete = {}, cesu8.len() = {}",
self.is_data_complete,
self.cesu8.len()
);
Ok(())
}
fn load_complete(&mut self) -> HdbResult<()> {
trace!("NCLobHandle::load_complete()");
while !self.is_data_complete {
self.fetch_next_chunk()?;
}
Ok(())
}
fn max_buf_len(&self) -> usize {
self.max_buf_len
}
fn into_string(mut self) -> HdbResult<String> {
trace!("NCLobHandle::into_string()");
self.load_complete()?;
Ok(util::string_from_cesu8(Vec::from(self.cesu8))?)
}
}
impl std::io::Read for NCLobHandle {
fn read(&mut self, mut buf: &mut [u8]) -> std::io::Result<usize> {
trace!("read() with buf of len {}", buf.len());
while !self.is_data_complete && (buf.len() > self.cesu8.len() - self.cesu8_tail_len) {
self.fetch_next_chunk()
.map_err(|e| util::io_error(e.to_string()))?;
}
let drain_len = std::cmp::min(buf.len(), self.cesu8.len());
let cesu8_buf: Vec<u8> = self.cesu8.drain(0..drain_len).collect();
let cut_off_position =
cesu8_buf.len() - util::get_cesu8_tail_len(&cesu8_buf, cesu8_buf.len())?;
let utf8 = cesu8::from_cesu8(&cesu8_buf[0..cut_off_position]).map_err(util::io_error)?;
for i in (cut_off_position..cesu8_buf.len()).rev() {
self.cesu8.push_front(cesu8_buf[i]);
}
buf.write_all(utf8.as_bytes())?;
Ok(utf8.len())
}
}
fn count_1_2_3_sequence_starts(cesu8: &[u8]) -> usize {
cesu8.iter().filter(|b| is_utf8_char_start(**b)).count()
}
fn is_utf8_char_start(b: u8) -> bool {
match b {
0x00..=0x7F | 0xC0..=0xDF | 0xE0..=0xEF | 0xF0..=0xF7 => true,
_ => false,
}
}