use crate::conn_core::AmConnCore;
use crate::protocol::argument::Argument;
use crate::protocol::part::{Part, Parts};
use crate::protocol::part_attributes::PartAttributes;
use crate::protocol::partkind::PartKind;
use crate::protocol::parts::resultset_metadata::ResultSetMetadata;
use crate::protocol::parts::row::Row;
use crate::protocol::parts::statement_context::StatementContext;
use crate::protocol::reply_type::ReplyType;
use crate::protocol::request::Request;
use crate::protocol::request_type::RequestType;
use crate::protocol::server_resource_consumption_info::ServerResourceConsumptionInfo;
use crate::{HdbError, HdbResult};
use serde;
use serde_db::de::DeserializableResultset;
use std::fmt;
use std::sync::{Arc, Mutex};
pub(crate) type AmRsCore = Arc<Mutex<ResultSetCore>>;
#[derive(Debug)]
pub struct ResultSet {
o_am_rscore: Option<AmRsCore>,
metadata: Arc<ResultSetMetadata>,
next_rows: Vec<Row>,
row_iter: <Vec<Row> as IntoIterator>::IntoIter,
server_resource_consumption_info: ServerResourceConsumptionInfo,
}
#[derive(Debug)]
pub struct ResultSetCore {
am_conn_core: AmConnCore,
attributes: PartAttributes,
resultset_id: u64,
}
impl ResultSetCore {
fn new_am_rscore(
am_conn_core: &AmConnCore,
attributes: PartAttributes,
resultset_id: u64,
) -> Arc<Mutex<ResultSetCore>> {
Arc::new(Mutex::new(ResultSetCore {
am_conn_core: am_conn_core.clone(),
attributes,
resultset_id,
}))
}
}
impl Drop for ResultSetCore {
fn drop(&mut self) {
let rs_id = self.resultset_id;
trace!("ResultSetCore::drop(), resultset_id {}", rs_id);
if !self.attributes.resultset_is_closed() {
if let Ok(mut conn_guard) = self.am_conn_core.lock() {
let mut request = Request::new(RequestType::CloseResultSet, 0);
request.push(Part::new(
PartKind::ResultSetId,
Argument::ResultSetId(rs_id),
));
if let Ok(mut reply) =
conn_guard.roundtrip(request, &self.am_conn_core, None, None, &mut None)
{
let _ = reply.parts.pop_arg_if_kind(PartKind::StatementContext);
while let Some(part) = reply.parts.pop() {
warn!(
"CloseResultSet got a reply with a part of kind {:?}",
part.kind()
);
}
}
}
}
}
}
impl ResultSet {
pub fn try_into<'de, T>(self) -> HdbResult<T>
where
T: serde::de::Deserialize<'de>,
{
trace!("Resultset::try_into()");
Ok(DeserializableResultset::into_typed(self)?)
}
pub fn into_single_row(mut self) -> HdbResult<Row> {
if self.has_multiple_rows() {
Err(HdbError::Usage(
"Resultset has more than one row".to_owned(),
))
} else {
self.next_row()?
.ok_or_else(|| HdbError::Usage("Resultset is empty".to_owned()))
}
}
pub fn metadata(&self) -> &ResultSetMetadata {
&self.metadata
}
pub fn total_number_of_rows(&mut self) -> HdbResult<usize> {
self.fetch_all()?;
Ok(self.next_rows.len() + self.row_iter.len())
}
pub fn next_row(&mut self) -> HdbResult<Option<Row>> {
match self.row_iter.next() {
Some(r) => Ok(Some(r)),
None => {
if self.next_rows.is_empty() {
if self.is_complete()? {
return Ok(None);
}
self.fetch_next()?;
}
let mut tmp_vec = Vec::<Row>::new();
std::mem::swap(&mut tmp_vec, &mut self.next_rows);
self.row_iter = tmp_vec.into_iter();
Ok(self.row_iter.next())
}
}
}
pub(crate) fn has_multiple_rows(&mut self) -> bool {
let is_complete = match self.is_complete() {
Ok(b) => b,
Err(_) => false,
};
!is_complete || (self.next_rows.len() + self.row_iter.len() > 1)
}
pub fn fetch_all(&mut self) -> HdbResult<()> {
while !self.is_complete()? {
self.fetch_next()?;
}
Ok(())
}
fn fetch_next(&mut self) -> HdbResult<()> {
trace!("ResultSet::fetch_next()");
let (mut conn_core, resultset_id, fetch_size) = {
match self.o_am_rscore {
Some(ref am_rscore) => {
let rs_core = am_rscore.lock()?;
let am_conn_core = rs_core.am_conn_core.clone();
let fetch_size = { am_conn_core.lock()?.get_fetch_size() };
(am_conn_core, rs_core.resultset_id, fetch_size)
}
None => {
return Err(HdbError::impl_("Fetch no more possible"));
}
}
};
debug!("ResultSet::fetch_next() with fetch_size = {}", fetch_size);
let mut request = Request::new(RequestType::FetchNext, 0);
request.push(Part::new(
PartKind::ResultSetId,
Argument::ResultSetId(resultset_id),
));
request.push(Part::new(
PartKind::FetchSize,
Argument::FetchSize(fetch_size),
));
let mut reply = conn_core.full_send(request, None, None, &mut Some(self))?;
reply.assert_expected_reply_type(&ReplyType::Fetch)?;
reply.parts.pop_arg_if_kind(PartKind::ResultSet);
let mut drop_rs_core = false;
if let Some(ref am_rscore) = self.o_am_rscore {
drop_rs_core = am_rscore.lock()?.attributes.is_last_packet();
};
if drop_rs_core {
self.o_am_rscore = None;
}
Ok(())
}
fn is_complete(&self) -> HdbResult<bool> {
if let Some(ref am_rscore) = self.o_am_rscore {
let rs_core = am_rscore.lock()?;
if (!rs_core.attributes.is_last_packet())
&& (rs_core.attributes.row_not_found() || rs_core.attributes.resultset_is_closed())
{
Err(HdbError::impl_(
"ResultSet attributes inconsistent: incomplete, but already closed on server",
))
} else {
Ok(rs_core.attributes.is_last_packet())
}
} else {
Ok(true)
}
}
pub(crate) fn new(
am_conn_core: &AmConnCore,
attrs: PartAttributes,
rs_id: u64,
rsm: ResultSetMetadata,
o_stmt_ctx: Option<StatementContext>,
) -> ResultSet {
let mut server_resource_consumption_info: ServerResourceConsumptionInfo =
Default::default();
if let Some(stmt_ctx) = o_stmt_ctx {
server_resource_consumption_info.update(
stmt_ctx.get_server_processing_time(),
stmt_ctx.get_server_cpu_time(),
stmt_ctx.get_server_memory_usage(),
);
}
ResultSet {
o_am_rscore: Some(ResultSetCore::new_am_rscore(am_conn_core, attrs, rs_id)),
metadata: Arc::new(rsm),
next_rows: Vec::<Row>::new(),
row_iter: Vec::<Row>::new().into_iter(),
server_resource_consumption_info,
}
}
pub(crate) fn parse<T: std::io::BufRead>(
no_of_rows: usize,
attributes: PartAttributes,
parts: &mut Parts,
am_conn_core: &AmConnCore,
rs_md: Option<&ResultSetMetadata>,
o_rs: &mut Option<&mut ResultSet>,
rdr: &mut T,
) -> HdbResult<Option<ResultSet>> {
match *o_rs {
None => {
let o_stmt_ctx = match parts.pop_arg_if_kind(PartKind::StatementContext) {
Some(Argument::StatementContext(stmt_ctx)) => Some(stmt_ctx),
None => None,
_ => {
return Err(HdbError::impl_(
"Inconsistent StatementContext part found for ResultSet",
));
}
};
let rs_id = match parts.pop_arg() {
Some(Argument::ResultSetId(rs_id)) => rs_id,
_ => return Err(HdbError::impl_("No ResultSetId part found for ResultSet")),
};
let rs_metadata = match parts.pop_arg_if_kind(PartKind::ResultSetMetadata) {
Some(Argument::ResultSetMetadata(rsmd)) => rsmd,
None => match rs_md {
Some(rs_md) => rs_md.clone(),
_ => return Err(HdbError::impl_("No metadata provided for ResultSet")),
},
_ => {
return Err(HdbError::impl_(
"Inconsistent metadata part found for ResultSet",
));
}
};
let mut result =
ResultSet::new(am_conn_core, attributes, rs_id, rs_metadata, o_stmt_ctx);
ResultSet::parse_rows(&mut result, no_of_rows, rdr)?;
Ok(Some(result))
}
Some(ref mut fetching_resultset) => {
match parts.pop_arg_if_kind(PartKind::StatementContext) {
Some(Argument::StatementContext(stmt_ctx)) => {
fetching_resultset.server_resource_consumption_info.update(
stmt_ctx.get_server_processing_time(),
stmt_ctx.get_server_cpu_time(),
stmt_ctx.get_server_memory_usage(),
);
}
None => {}
_ => {
return Err(HdbError::impl_(
"Inconsistent StatementContext part found for ResultSet",
));
}
};
if let Some(ref mut am_rscore) = fetching_resultset.o_am_rscore {
let mut rscore = am_rscore.lock()?;
rscore.attributes = attributes;
}
ResultSet::parse_rows(fetching_resultset, no_of_rows, rdr)?;
Ok(None)
}
}
}
fn parse_rows(&mut self, no_of_rows: usize, rdr: &mut std::io::BufRead) -> HdbResult<()> {
self.next_rows.reserve(no_of_rows);
let no_of_cols = self.metadata.number_of_fields();
debug!("parse_rows(): {} lines, {} columns", no_of_rows, no_of_cols);
if let Some(ref mut am_rscore) = self.o_am_rscore {
let rscore = am_rscore.lock()?;
let am_conn_core: &AmConnCore = &rscore.am_conn_core;
let o_am_rscore = Some(am_rscore.clone());
for i in 0..no_of_rows {
let row = Row::parse(Arc::clone(&self.metadata), &o_am_rscore, am_conn_core, rdr)?;
trace!("parse_rows(): Found row #{}: {}", i, row);
self.next_rows.push(row);
}
}
Ok(())
}
}
impl fmt::Display for ResultSet {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
writeln!(fmt, "{}\n", &self.metadata)?;
for row in self.row_iter.as_slice() {
writeln!(fmt, "{}\n", &row)?;
}
for row in &self.next_rows {
writeln!(fmt, "{}\n", &row)?;
}
Ok(())
}
}
impl Iterator for ResultSet {
type Item = HdbResult<Row>;
fn next(&mut self) -> Option<HdbResult<Row>> {
match self.next_row() {
Ok(Some(row)) => Some(Ok(row)),
Ok(None) => None,
Err(e) => Some(Err(e)),
}
}
}