use crate::conn::{AmConnCore, ConnectionCore};
use crate::hdb_response::InternalReturnValue;
use crate::protocol::parts::{
ExecutionResult, ParameterDescriptors, Parts, ResultSetMetadata, RsState, ServerError, Severity,
};
use crate::protocol::{util, Part, PartKind, ReplyType, ServerUsage};
use crate::{HdbError, HdbResult};
use byteorder::{LittleEndian, ReadBytesExt};
use std::sync::Arc;
#[derive(Debug)]
pub(crate) struct Reply {
session_id: i64,
pub replytype: ReplyType,
pub parts: Parts<'static>,
}
impl Reply {
fn new(session_id: i64, replytype: ReplyType) -> Self {
Self {
session_id,
replytype,
parts: Parts::default(),
}
}
pub fn session_id(&self) -> i64 {
self.session_id
}
pub fn parse(
o_a_rsmd: Option<&Arc<ResultSetMetadata>>,
o_a_descriptors: Option<&Arc<ParameterDescriptors>>,
o_rs: &mut Option<&mut RsState>,
o_am_conn_core: Option<&AmConnCore>,
rdr: &mut dyn std::io::Read,
) -> std::io::Result<Self> {
trace!("Reply::parse()");
let (no_of_parts, mut reply) = parse_message_and_sequence_header(rdr)?;
for i in 0..no_of_parts {
let part = Part::parse(
&mut (reply.parts),
o_am_conn_core,
o_a_rsmd,
o_a_descriptors,
o_rs,
i == no_of_parts - 1,
rdr,
)?;
reply.push(part);
}
Ok(reply)
}
pub fn assert_expected_reply_type(&self, expected_reply_type: ReplyType) -> HdbResult<()> {
if self.replytype == expected_reply_type {
Ok(())
} else {
Err(HdbError::ImplDetailed(format!(
"Expected reply type {:?}, got {:?}",
expected_reply_type, self.replytype,
)))
}
}
pub fn push(&mut self, part: Part<'static>) {
self.parts.push(part);
}
pub fn into_internal_return_values(
self,
am_conn_core: &mut AmConnCore,
o_additional_server_usage: Option<&mut ServerUsage>,
) -> HdbResult<(Vec<InternalReturnValue>, ReplyType)> {
Ok((
self.parts
.into_internal_return_values(am_conn_core, o_additional_server_usage)?,
self.replytype,
))
}
pub(crate) fn handle_db_error(&mut self, conn_core: &mut ConnectionCore) -> HdbResult<()> {
conn_core.warnings.clear();
let mut server_errors = {
match self.parts.remove_first_of_kind(PartKind::Error) {
None => {
return Ok(());
}
Some(Part::Error(server_warnings_and_errors)) => {
let (mut warnings, server_errors): (Vec<ServerError>, Vec<ServerError>) =
server_warnings_and_errors
.into_iter()
.partition(|se| &Severity::Warning == se.severity());
std::mem::swap(&mut conn_core.warnings, &mut warnings);
if server_errors.is_empty() {
return Ok(());
}
server_errors
}
Some(_non_error_part) => unreachable!("129837938423"),
}
};
let mut o_execution_results = None;
self.parts.reverse(); while let Some(part) = self.parts.pop() {
match part {
Part::StatementContext(ref stmt_ctx) => {
conn_core.evaluate_statement_context(stmt_ctx);
}
Part::TransactionFlags(ta_flags) => {
conn_core.evaluate_ta_flags(ta_flags)?;
}
Part::ExecutionResult(vec) => {
o_execution_results = Some(vec);
}
part => warn!(
"Reply::handle_db_error(): ignoring unexpected part of kind {:?}",
part.kind()
),
}
}
match o_execution_results {
Some(execution_results) => {
let mut err_iter = server_errors.into_iter();
let mut execution_results = execution_results
.into_iter()
.map(|er| match er {
ExecutionResult::Failure(_) => ExecutionResult::Failure(err_iter.next()),
_ => er,
})
.collect::<Vec<ExecutionResult>>();
for e in err_iter {
warn!(
"Reply::handle_db_error(): \
found more server_errors than instances of ExecutionResult::Failure"
);
execution_results.push(ExecutionResult::Failure(Some(e)));
}
Err(HdbError::ExecutionResults(execution_results))
}
None => {
if server_errors.len() == 1 {
Err(HdbError::from(server_errors.remove(0)))
} else {
unreachable!("hopefully...")
}
}
}
}
}
fn parse_message_and_sequence_header(rdr: &mut dyn std::io::Read) -> std::io::Result<(i16, Reply)> {
let session_id: i64 = rdr.read_i64::<LittleEndian>()?; let packet_seq_number: i32 = rdr.read_i32::<LittleEndian>()?; let varpart_size: u32 = rdr.read_u32::<LittleEndian>()?; let remaining_bufsize: u32 = rdr.read_u32::<LittleEndian>()?; let no_of_segs = rdr.read_i16::<LittleEndian>()?; if no_of_segs == 0 {
return Err(util::io_error("empty response (is ok for drop connection)"));
}
if no_of_segs > 1 {
return Err(util::io_error(format!("no_of_segs = {no_of_segs} > 1")));
}
util::skip_bytes(10, rdr)?;
rdr.read_i32::<LittleEndian>()?; rdr.read_i32::<LittleEndian>()?; let no_of_parts: i16 = rdr.read_i16::<LittleEndian>()?; rdr.read_i16::<LittleEndian>()?; let seg_kind = Kind::from_i8(rdr.read_i8()?)?;
trace!(
"message and segment header: {{ packet_seq_number = {}, varpart_size = {}, \
remaining_bufsize = {}, no_of_parts = {} }}",
packet_seq_number,
varpart_size,
remaining_bufsize,
no_of_parts
);
match seg_kind {
Kind::Request => Err(util::io_error("Cannot _parse_ a request".to_string())),
Kind::Reply | Kind::Error => {
util::skip_bytes(1, rdr)?; let reply_type = ReplyType::from_i16(rdr.read_i16::<LittleEndian>()?)?; util::skip_bytes(8, rdr)?; debug!(
"Reply::parse(): got reply of type {:?} and seg_kind {:?} for session_id {}",
reply_type, seg_kind, session_id
);
Ok((no_of_parts, Reply::new(session_id, reply_type)))
}
}
}
#[derive(Debug)]
enum Kind {
Request,
Reply,
Error,
}
impl Kind {
fn from_i8(val: i8) -> std::io::Result<Self> {
match val {
1 => Ok(Self::Request),
2 => Ok(Self::Reply),
5 => Ok(Self::Error),
_ => Err(util::io_error(format!(
"reply::Kind {} not implemented",
val
))),
}
}
}