use super::{
PgConnection, PgError, PgResult,
extended_flow::{ExtendedFlowConfig, ExtendedFlowTracker},
is_ignorable_session_message, is_ignorable_session_msg_type, unexpected_backend_message,
unexpected_backend_msg_type,
};
use crate::protocol::{BackendMessage, PgEncoder};
use bytes::{Bytes, BytesMut};
#[inline]
fn capture_query_server_error(conn: &mut PgConnection, slot: &mut Option<PgError>, err: PgError) {
if slot.is_some() {
return;
}
if err.is_prepared_statement_retryable() {
conn.clear_prepared_statement_state();
}
*slot = Some(err);
}
#[inline]
fn reserve_prepared_single_write_buf(
conn: &mut PgConnection,
stmt: &super::PreparedStatement,
params: &[Option<Vec<u8>>],
result_format: i16,
) -> PgResult<()> {
conn.write_buf.clear();
let needed = PgEncoder::bind_execute_sync_wire_len_with_formats(
&stmt.name,
params,
PgEncoder::FORMAT_TEXT,
result_format,
)
.map_err(|e| PgError::Encode(e.to_string()))?;
conn.write_buf.reserve(needed);
Ok(())
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum SimpleStatementState {
AwaitingResult,
InRowStream,
}
#[derive(Debug, Clone, Copy)]
struct SimpleFlowTracker {
state: SimpleStatementState,
saw_completion: bool,
}
impl SimpleFlowTracker {
fn new() -> Self {
Self {
state: SimpleStatementState::AwaitingResult,
saw_completion: false,
}
}
fn on_row_description(&mut self, context: &'static str) -> PgResult<()> {
if self.state == SimpleStatementState::InRowStream {
return Err(PgError::Protocol(format!(
"{}: duplicate RowDescription before statement completion",
context
)));
}
self.state = SimpleStatementState::InRowStream;
self.saw_completion = false;
Ok(())
}
fn on_data_row(&self, context: &'static str) -> PgResult<()> {
if self.state != SimpleStatementState::InRowStream {
return Err(PgError::Protocol(format!(
"{}: DataRow before RowDescription",
context
)));
}
Ok(())
}
fn on_command_complete(&mut self) {
self.state = SimpleStatementState::AwaitingResult;
self.saw_completion = true;
}
fn on_empty_query_response(&mut self, context: &'static str) -> PgResult<()> {
if self.state == SimpleStatementState::InRowStream {
return Err(PgError::Protocol(format!(
"{}: EmptyQueryResponse during active row stream",
context
)));
}
self.saw_completion = true;
Ok(())
}
fn on_ready_for_query(&self, context: &'static str, error_pending: bool) -> PgResult<()> {
if error_pending {
return Ok(());
}
if self.state == SimpleStatementState::InRowStream {
return Err(PgError::Protocol(format!(
"{}: ReadyForQuery before CommandComplete",
context
)));
}
if !self.saw_completion {
return Err(PgError::Protocol(format!(
"{}: ReadyForQuery before completion",
context
)));
}
Ok(())
}
}
impl PgConnection {
fn validate_param_type_arity(params: &[Option<Vec<u8>>], param_types: &[u32]) -> PgResult<()> {
if !param_types.is_empty() && param_types.len() != params.len() {
return Err(PgError::Encode(format!(
"parameter type count {} does not match parameter count {}",
param_types.len(),
params.len()
)));
}
Ok(())
}
pub(crate) async fn query(
&mut self,
sql: &str,
params: &[Option<Vec<u8>>],
) -> PgResult<Vec<Vec<Option<Vec<u8>>>>> {
self.query_with_result_format(sql, params, PgEncoder::FORMAT_TEXT)
.await
}
pub(crate) async fn query_with_result_format(
&mut self,
sql: &str,
params: &[Option<Vec<u8>>],
result_format: i16,
) -> PgResult<Vec<Vec<Option<Vec<u8>>>>> {
let bytes = PgEncoder::encode_extended_query_with_result_format(sql, params, result_format)
.map_err(|e| PgError::Encode(e.to_string()))?;
self.write_all_with_timeout(&bytes, "stream write").await?;
let mut rows = Vec::new();
let mut error: Option<PgError> = None;
let mut flow = ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_execute(true));
loop {
let msg = self.recv().await?;
flow.validate(&msg, "extended-query execute", error.is_some())?;
match msg {
BackendMessage::ParseComplete => {}
BackendMessage::BindComplete => {}
BackendMessage::RowDescription(_) => {}
BackendMessage::DataRow(data) => {
if error.is_none() {
rows.push(data);
}
}
BackendMessage::CommandComplete(_) => {}
BackendMessage::NoData => {}
BackendMessage::ReadyForQuery(_) => {
if let Some(err) = error {
return Err(err);
}
return Ok(rows);
}
BackendMessage::ErrorResponse(err) => {
if error.is_none() {
error = Some(PgError::QueryServer(err.into()));
}
}
msg if is_ignorable_session_message(&msg) => {}
other => {
return Err(unexpected_backend_message("extended-query execute", &other));
}
}
}
}
pub async fn query_count(&mut self, sql: &str, params: &[Option<Vec<u8>>]) -> PgResult<()> {
self.query_count_with_param_types(sql, &[], params).await
}
pub async fn query_count_with_param_types(
&mut self,
sql: &str,
param_types: &[u32],
params: &[Option<Vec<u8>>],
) -> PgResult<()> {
Self::validate_param_type_arity(params, param_types)?;
self.write_buf.clear();
PgEncoder::try_encode_parse_to(&mut self.write_buf, "", sql, param_types)
.map_err(|e| PgError::Encode(e.to_string()))?;
PgEncoder::encode_bind_to(&mut self.write_buf, "", params)
.map_err(|e| PgError::Encode(e.to_string()))?;
PgEncoder::encode_execute_to(&mut self.write_buf);
PgEncoder::encode_sync_to(&mut self.write_buf);
self.flush_write_buf().await?;
let mut error: Option<PgError> = None;
let mut flow = ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_execute(true));
loop {
match self.recv_msg_type_fast().await {
Ok(msg_type) => {
flow.validate_msg_type(
msg_type,
"extended-query count execute",
error.is_some(),
)?;
match msg_type {
b'1' | b'2' | b'T' | b'D' | b'C' | b'n' => {}
b'Z' => {
if let Some(err) = error {
return Err(err);
}
return Ok(());
}
msg_type if is_ignorable_session_msg_type(msg_type) => {}
other => {
return Err(unexpected_backend_msg_type(
"extended-query count execute",
other,
));
}
}
}
Err(e) => {
if matches!(&e, PgError::QueryServer(_)) {
capture_query_server_error(self, &mut error, e);
continue;
}
return Err(e);
}
}
}
}
pub async fn query_rows(
&mut self,
sql: &str,
params: &[Option<Vec<u8>>],
) -> PgResult<Vec<super::PgRow>> {
self.query_rows_with_result_format(sql, params, PgEncoder::FORMAT_TEXT)
.await
}
pub async fn query_rows_with_result_format(
&mut self,
sql: &str,
params: &[Option<Vec<u8>>],
result_format: i16,
) -> PgResult<Vec<super::PgRow>> {
self.query_rows_with_param_types_and_result_format(sql, &[], params, result_format)
.await
}
pub async fn query_visit_bytes_rows_with_result_format<F>(
&mut self,
sql: &str,
params: &[Option<Vec<u8>>],
result_format: i16,
on_row: F,
) -> PgResult<usize>
where
F: FnMut(&super::PgBytesRow) -> PgResult<()>,
{
self.query_visit_bytes_rows_with_param_types_and_result_format(
sql,
&[],
params,
result_format,
on_row,
)
.await
}
pub async fn query_visit_first_column_bytes_with_result_format<F>(
&mut self,
sql: &str,
params: &[Option<Vec<u8>>],
result_format: i16,
on_value: F,
) -> PgResult<usize>
where
F: FnMut(Option<&[u8]>) -> PgResult<()>,
{
self.query_visit_first_column_bytes_with_param_types_and_result_format(
sql,
&[],
params,
result_format,
on_value,
)
.await
}
pub async fn query_rows_with_param_types_and_result_format(
&mut self,
sql: &str,
param_types: &[u32],
params: &[Option<Vec<u8>>],
result_format: i16,
) -> PgResult<Vec<super::PgRow>> {
use std::sync::Arc;
Self::validate_param_type_arity(params, param_types)?;
self.write_buf.clear();
PgEncoder::try_encode_parse_to(&mut self.write_buf, "", sql, param_types)
.map_err(|e| PgError::Encode(e.to_string()))?;
PgEncoder::encode_bind_to_with_result_format(
&mut self.write_buf,
"",
params,
result_format,
)
.map_err(|e| PgError::Encode(e.to_string()))?;
PgEncoder::encode_execute_to(&mut self.write_buf);
PgEncoder::encode_sync_to(&mut self.write_buf);
self.flush_write_buf().await?;
let mut rows: Vec<super::PgRow> = Vec::new();
let mut column_info: Option<Arc<super::ColumnInfo>> = None;
let mut error: Option<PgError> = None;
let mut flow = ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_execute(true));
loop {
let msg = self.recv().await?;
flow.validate(&msg, "extended-query rows execute", error.is_some())?;
match msg {
BackendMessage::ParseComplete => {}
BackendMessage::BindComplete => {}
BackendMessage::RowDescription(fields) => {
column_info = Some(Arc::new(super::ColumnInfo::from_fields(&fields)));
}
BackendMessage::DataRow(data) => {
if error.is_none() {
rows.push(super::PgRow {
columns: data,
column_info: column_info.clone(),
});
}
}
BackendMessage::CommandComplete(_) => {}
BackendMessage::NoData => {}
BackendMessage::ReadyForQuery(_) => {
if let Some(err) = error {
return Err(err);
}
return Ok(rows);
}
BackendMessage::ErrorResponse(err) => {
if error.is_none() {
error = Some(PgError::QueryServer(err.into()));
}
}
msg if is_ignorable_session_message(&msg) => {}
other => {
return Err(unexpected_backend_message(
"extended-query rows execute",
&other,
));
}
}
}
}
pub async fn query_visit_bytes_rows_with_param_types_and_result_format<F>(
&mut self,
sql: &str,
param_types: &[u32],
params: &[Option<Vec<u8>>],
result_format: i16,
mut on_row: F,
) -> PgResult<usize>
where
F: FnMut(&super::PgBytesRow) -> PgResult<()>,
{
Self::validate_param_type_arity(params, param_types)?;
self.write_buf.clear();
PgEncoder::try_encode_parse_to(&mut self.write_buf, "", sql, param_types)
.map_err(|e| PgError::Encode(e.to_string()))?;
PgEncoder::encode_bind_to_with_result_format(
&mut self.write_buf,
"",
params,
result_format,
)
.map_err(|e| PgError::Encode(e.to_string()))?;
PgEncoder::encode_execute_to(&mut self.write_buf);
PgEncoder::encode_sync_to(&mut self.write_buf);
self.flush_write_buf().await?;
let mut row_count = 0usize;
let mut row = super::PgBytesRow::default();
let mut error: Option<PgError> = None;
let mut flow = ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_execute(true));
loop {
match self.recv_fill_zerocopy_row_fast(&mut row).await {
Ok(msg_type) => {
flow.validate_msg_type(
msg_type,
"extended-query visit bytes execute",
error.is_some(),
)?;
match msg_type {
b'1' | b'2' | b'T' | b'n' => {}
b'D' => {
if error.is_none() {
on_row(&row)?;
row_count += 1;
row.release_payload();
}
}
b'C' => {}
b'Z' => {
if let Some(err) = error {
return Err(err);
}
return Ok(row_count);
}
msg_type if is_ignorable_session_msg_type(msg_type) => {}
other => {
return Err(unexpected_backend_msg_type(
"extended-query visit bytes execute",
other,
));
}
}
}
Err(e) => {
if matches!(&e, PgError::QueryServer(_)) {
capture_query_server_error(self, &mut error, e);
continue;
}
return Err(e);
}
}
}
}
pub async fn query_visit_first_column_bytes_with_param_types_and_result_format<F>(
&mut self,
sql: &str,
param_types: &[u32],
params: &[Option<Vec<u8>>],
result_format: i16,
mut on_value: F,
) -> PgResult<usize>
where
F: FnMut(Option<&[u8]>) -> PgResult<()>,
{
Self::validate_param_type_arity(params, param_types)?;
self.write_buf.clear();
PgEncoder::try_encode_parse_to(&mut self.write_buf, "", sql, param_types)
.map_err(|e| PgError::Encode(e.to_string()))?;
PgEncoder::encode_bind_to_with_result_format(
&mut self.write_buf,
"",
params,
result_format,
)
.map_err(|e| PgError::Encode(e.to_string()))?;
PgEncoder::encode_execute_to(&mut self.write_buf);
PgEncoder::encode_sync_to(&mut self.write_buf);
self.flush_write_buf().await?;
let mut row_count = 0usize;
let mut first_column: Option<Bytes> = None;
let mut error: Option<PgError> = None;
let mut flow = ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_execute(true));
loop {
match self
.recv_fill_first_column_zerocopy_fast(&mut first_column)
.await
{
Ok(msg_type) => {
flow.validate_msg_type(
msg_type,
"extended-query visit first-column execute",
error.is_some(),
)?;
match msg_type {
b'1' | b'2' | b'T' | b'n' => {}
b'D' => {
if error.is_none() {
on_value(first_column.as_deref())?;
row_count += 1;
first_column = None;
}
}
b'C' => {}
b'Z' => {
if let Some(err) = error {
return Err(err);
}
return Ok(row_count);
}
msg_type if is_ignorable_session_msg_type(msg_type) => {}
other => {
return Err(unexpected_backend_msg_type(
"extended-query visit first-column execute",
other,
));
}
}
}
Err(e) => {
if matches!(&e, PgError::QueryServer(_)) {
capture_query_server_error(self, &mut error, e);
continue;
}
return Err(e);
}
}
}
}
pub async fn probe_query_with_param_types(
&mut self,
sql: &str,
param_types: &[u32],
params: &[Option<Vec<u8>>],
) -> PgResult<()> {
Self::validate_param_type_arity(params, param_types)?;
let parse = PgEncoder::try_encode_parse("", sql, param_types)
.map_err(|e| PgError::Encode(e.to_string()))?;
let bind =
PgEncoder::encode_bind("", "", params).map_err(|e| PgError::Encode(e.to_string()))?;
let describe =
PgEncoder::try_encode_describe(true, "").map_err(|e| PgError::Encode(e.to_string()))?;
let sync = PgEncoder::encode_sync();
let mut bytes =
BytesMut::with_capacity(parse.len() + bind.len() + describe.len() + sync.len());
bytes.extend_from_slice(&parse);
bytes.extend_from_slice(&bind);
bytes.extend_from_slice(&describe);
bytes.extend_from_slice(&sync);
self.write_all_with_timeout(&bytes, "stream write").await?;
let mut saw_describe_response = false;
let mut error: Option<PgError> = None;
let mut flow = ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_describe_portal());
loop {
let msg = self.recv().await?;
flow.validate(&msg, "extended-query probe", error.is_some())?;
match msg {
BackendMessage::ParseComplete => {}
BackendMessage::BindComplete => {}
BackendMessage::RowDescription(_) | BackendMessage::NoData => {
saw_describe_response = true;
}
BackendMessage::ReadyForQuery(_) => {
if let Some(err) = error {
return Err(err);
}
if !saw_describe_response {
return Err(PgError::Protocol(
"extended-query probe finished without RowDescription/NoData"
.to_string(),
));
}
return Ok(());
}
BackendMessage::ErrorResponse(err) => {
if error.is_none() {
error = Some(PgError::QueryServer(err.into()));
}
}
msg if is_ignorable_session_message(&msg) => {}
other => {
return Err(unexpected_backend_message("extended-query probe", &other));
}
}
}
}
pub async fn query_cached(
&mut self,
sql: &str,
params: &[Option<Vec<u8>>],
) -> PgResult<Vec<Vec<Option<Vec<u8>>>>> {
self.query_cached_with_result_format(sql, params, PgEncoder::FORMAT_TEXT)
.await
}
pub async fn query_cached_with_result_format(
&mut self,
sql: &str,
params: &[Option<Vec<u8>>],
result_format: i16,
) -> PgResult<Vec<Vec<Option<Vec<u8>>>>> {
let mut retried = false;
loop {
match self
.query_cached_with_result_format_once(sql, params, result_format)
.await
{
Ok(rows) => return Ok(rows),
Err(err)
if !retried
&& (err.is_prepared_statement_retryable()
|| err.is_prepared_statement_already_exists()) =>
{
retried = true;
if err.is_prepared_statement_retryable() {
self.clear_prepared_statement_state();
}
}
Err(err) => return Err(err),
}
}
}
async fn query_cached_with_result_format_once(
&mut self,
sql: &str,
params: &[Option<Vec<u8>>],
result_format: i16,
) -> PgResult<Vec<Vec<Option<Vec<u8>>>>> {
let stmt_name = Self::sql_to_stmt_name(sql);
let is_new = !self.prepared_statements.contains_key(&stmt_name);
let params_size: usize = params
.iter()
.map(|p| 4 + p.as_ref().map_or(0, |v| v.len()))
.sum();
let estimated_size = if is_new {
50 + sql.len() + stmt_name.len() * 2 + params_size
} else {
30 + stmt_name.len() + params_size
};
let mut buf = BytesMut::with_capacity(estimated_size);
if is_new {
self.evict_prepared_if_full();
buf.extend(PgEncoder::try_encode_parse(&stmt_name, sql, &[])?);
self.prepared_statements
.insert(stmt_name.clone(), sql.to_string());
}
if let Err(e) = PgEncoder::encode_bind_to_with_result_format(
&mut buf,
&stmt_name,
params,
result_format,
) {
if is_new {
self.prepared_statements.remove(&stmt_name);
}
return Err(PgError::Encode(e.to_string()));
}
PgEncoder::encode_execute_to(&mut buf);
PgEncoder::encode_sync_to(&mut buf);
if let Err(err) = self.write_all_with_timeout(&buf, "stream write").await {
if is_new {
self.prepared_statements.remove(&stmt_name);
}
return Err(err);
}
let mut rows = Vec::new();
let mut error: Option<PgError> = None;
let mut flow = ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_execute(is_new));
loop {
let msg = match self.recv().await {
Ok(msg) => msg,
Err(err) => {
if is_new && !flow.saw_parse_complete() {
self.prepared_statements.remove(&stmt_name);
}
return Err(err);
}
};
if let Err(err) = flow.validate(&msg, "extended-query cached execute", error.is_some())
{
if is_new && !flow.saw_parse_complete() {
self.prepared_statements.remove(&stmt_name);
}
return Err(err);
}
match msg {
BackendMessage::ParseComplete => {
}
BackendMessage::BindComplete => {}
BackendMessage::RowDescription(_) => {}
BackendMessage::DataRow(data) => {
if error.is_none() {
rows.push(data);
}
}
BackendMessage::CommandComplete(_) => {}
BackendMessage::NoData => {}
BackendMessage::ReadyForQuery(_) => {
if let Some(err) = error {
if is_new
&& !flow.saw_parse_complete()
&& !err.is_prepared_statement_already_exists()
{
self.prepared_statements.remove(&stmt_name);
}
return Err(err);
}
if is_new && !flow.saw_parse_complete() {
self.prepared_statements.remove(&stmt_name);
return Err(PgError::Protocol(
"Cache miss query reached ReadyForQuery without ParseComplete"
.to_string(),
));
}
return Ok(rows);
}
BackendMessage::ErrorResponse(err) => {
if error.is_none() {
let query_err = PgError::QueryServer(err.into());
if !query_err.is_prepared_statement_already_exists() {
self.prepared_statements.remove(&stmt_name);
}
error = Some(query_err);
}
}
msg if is_ignorable_session_message(&msg) => {}
other => {
if is_new && !flow.saw_parse_complete() {
self.prepared_statements.remove(&stmt_name);
}
return Err(unexpected_backend_message(
"extended-query cached execute",
&other,
));
}
}
}
}
pub(crate) fn sql_to_stmt_name(sql: &str) -> String {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
sql.hash(&mut hasher);
format!("s{:016x}", hasher.finish())
}
pub async fn execute_simple(&mut self, sql: &str) -> PgResult<()> {
let bytes = PgEncoder::try_encode_query_string(sql)?;
self.write_all_with_timeout(&bytes, "stream write").await?;
let mut error: Option<PgError> = None;
let mut flow = SimpleFlowTracker::new();
loop {
let msg = self.recv().await?;
match msg {
BackendMessage::RowDescription(_) => {
flow.on_row_description("simple-query execute")?;
}
BackendMessage::DataRow(_) => {
flow.on_data_row("simple-query execute")?;
}
BackendMessage::CommandComplete(_) => {
flow.on_command_complete();
}
BackendMessage::EmptyQueryResponse => {
flow.on_empty_query_response("simple-query execute")?;
}
BackendMessage::ReadyForQuery(_) => {
if let Some(err) = error {
return Err(err);
}
flow.on_ready_for_query("simple-query execute", error.is_some())?;
return Ok(());
}
BackendMessage::ErrorResponse(err) => {
if error.is_none() {
error = Some(PgError::QueryServer(err.into()));
}
}
msg if is_ignorable_session_message(&msg) => {}
other => {
return Err(unexpected_backend_message("simple-query execute", &other));
}
}
}
}
pub async fn simple_query(&mut self, sql: &str) -> PgResult<Vec<super::PgRow>> {
use std::sync::Arc;
const MAX_SIMPLE_QUERY_ROWS: usize = 10_000;
let bytes = PgEncoder::try_encode_query_string(sql)?;
self.write_all_with_timeout(&bytes, "stream write").await?;
let mut rows: Vec<super::PgRow> = Vec::new();
let mut column_info: Option<Arc<super::ColumnInfo>> = None;
let mut error: Option<PgError> = None;
let mut flow = SimpleFlowTracker::new();
loop {
let msg = self.recv().await?;
match msg {
BackendMessage::RowDescription(fields) => {
flow.on_row_description("simple-query read")?;
column_info = Some(Arc::new(super::ColumnInfo::from_fields(&fields)));
}
BackendMessage::DataRow(data) => {
flow.on_data_row("simple-query read")?;
if error.is_none() {
if rows.len() >= MAX_SIMPLE_QUERY_ROWS {
if error.is_none() {
error = Some(PgError::Query(format!(
"simple_query exceeded {} row safety cap",
MAX_SIMPLE_QUERY_ROWS,
)));
}
} else {
rows.push(super::PgRow {
columns: data,
column_info: column_info.clone(),
});
}
}
}
BackendMessage::CommandComplete(_) => {
flow.on_command_complete();
column_info = None;
}
BackendMessage::EmptyQueryResponse => {
flow.on_empty_query_response("simple-query read")?;
column_info = None;
}
BackendMessage::ReadyForQuery(_) => {
if let Some(err) = error {
return Err(err);
}
flow.on_ready_for_query("simple-query read", error.is_some())?;
return Ok(rows);
}
BackendMessage::ErrorResponse(err) => {
if error.is_none() {
error = Some(PgError::QueryServer(err.into()));
}
}
msg if is_ignorable_session_message(&msg) => {}
other => {
return Err(unexpected_backend_message("simple-query read", &other));
}
}
}
}
#[inline]
pub async fn query_prepared_single(
&mut self,
stmt: &super::PreparedStatement,
params: &[Option<Vec<u8>>],
) -> PgResult<Vec<Vec<Option<Vec<u8>>>>> {
self.query_prepared_single_with_result_format(stmt, params, PgEncoder::FORMAT_TEXT)
.await
}
#[inline]
pub async fn query_prepared_single_count(
&mut self,
stmt: &super::PreparedStatement,
params: &[Option<Vec<u8>>],
) -> PgResult<()> {
self.write_buf.clear();
PgEncoder::encode_bind_to(&mut self.write_buf, &stmt.name, params)
.map_err(|e| PgError::Encode(e.to_string()))?;
PgEncoder::encode_execute_to(&mut self.write_buf);
PgEncoder::encode_sync_to(&mut self.write_buf);
self.flush_write_buf().await?;
let mut error: Option<PgError> = None;
let mut flow = ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_execute(false));
loop {
match self.recv_msg_type_fast().await {
Ok(msg_type) => {
flow.validate_msg_type(
msg_type,
"prepared single count execute",
error.is_some(),
)?;
match msg_type {
b'2' | b'T' | b'D' | b'C' | b'n' => {}
b'Z' => {
if let Some(err) = error {
return Err(err);
}
return Ok(());
}
msg_type if is_ignorable_session_msg_type(msg_type) => {}
other => {
return Err(unexpected_backend_msg_type(
"prepared single count execute",
other,
));
}
}
}
Err(e) => {
if matches!(&e, PgError::QueryServer(_)) {
capture_query_server_error(self, &mut error, e);
continue;
}
return Err(e);
}
}
}
}
#[inline]
pub async fn query_prepared_single_with_result_format(
&mut self,
stmt: &super::PreparedStatement,
params: &[Option<Vec<u8>>],
result_format: i16,
) -> PgResult<Vec<Vec<Option<Vec<u8>>>>> {
let params_size: usize = params
.iter()
.map(|p| 4 + p.as_ref().map_or(0, |v| v.len()))
.sum();
let mut buf = BytesMut::with_capacity(30 + stmt.name.len() + params_size);
PgEncoder::encode_bind_to_with_result_format(&mut buf, &stmt.name, params, result_format)
.map_err(|e| PgError::Encode(e.to_string()))?;
PgEncoder::encode_execute_to(&mut buf);
PgEncoder::encode_sync_to(&mut buf);
self.write_all_with_timeout(&buf, "stream write").await?;
let mut rows = Vec::new();
let mut error: Option<PgError> = None;
let mut flow = ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_execute(false));
loop {
let msg = self.recv().await?;
flow.validate(&msg, "prepared single execute", error.is_some())?;
match msg {
BackendMessage::BindComplete => {}
BackendMessage::RowDescription(_) => {}
BackendMessage::DataRow(data) => {
if error.is_none() {
rows.push(data);
}
}
BackendMessage::CommandComplete(_) => {}
BackendMessage::NoData => {}
BackendMessage::ReadyForQuery(_) => {
if let Some(err) = error {
return Err(err);
}
return Ok(rows);
}
BackendMessage::ErrorResponse(err) => {
if error.is_none() {
error = Some(PgError::QueryServer(err.into()));
}
}
msg if is_ignorable_session_message(&msg) => {}
other => {
return Err(unexpected_backend_message(
"prepared single execute",
&other,
));
}
}
}
}
#[inline]
pub async fn query_prepared_single_reuse_with_result_format(
&mut self,
stmt: &super::PreparedStatement,
params: &[Option<Vec<u8>>],
result_format: i16,
) -> PgResult<Vec<Vec<Option<Vec<u8>>>>> {
reserve_prepared_single_write_buf(self, stmt, params, result_format)?;
PgEncoder::encode_bind_to_with_result_format(
&mut self.write_buf,
&stmt.name,
params,
result_format,
)
.map_err(|e| PgError::Encode(e.to_string()))?;
PgEncoder::encode_execute_to(&mut self.write_buf);
PgEncoder::encode_sync_to(&mut self.write_buf);
self.flush_write_buf().await?;
let mut rows = Vec::with_capacity(32);
let mut error: Option<PgError> = None;
let mut flow = ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_execute(false));
loop {
let msg = self.recv().await?;
flow.validate(&msg, "prepared single reuse execute", error.is_some())?;
match msg {
BackendMessage::BindComplete => {}
BackendMessage::RowDescription(_) => {}
BackendMessage::DataRow(data) => {
if error.is_none() {
rows.push(data);
}
}
BackendMessage::CommandComplete(_) => {}
BackendMessage::NoData => {}
BackendMessage::ReadyForQuery(_) => {
if let Some(err) = error {
return Err(err);
}
return Ok(rows);
}
BackendMessage::ErrorResponse(err) => {
if error.is_none() {
error = Some(PgError::QueryServer(err.into()));
}
}
msg if is_ignorable_session_message(&msg) => {}
other => {
return Err(unexpected_backend_message(
"prepared single reuse execute",
&other,
));
}
}
}
}
#[inline]
pub async fn query_prepared_single_reuse_visit_rows_with_result_format<F>(
&mut self,
stmt: &super::PreparedStatement,
params: &[Option<Vec<u8>>],
result_format: i16,
mut on_row: F,
) -> PgResult<usize>
where
F: FnMut(&[Option<Vec<u8>>]) -> PgResult<()>,
{
reserve_prepared_single_write_buf(self, stmt, params, result_format)?;
PgEncoder::encode_bind_to_with_result_format(
&mut self.write_buf,
&stmt.name,
params,
result_format,
)
.map_err(|e| PgError::Encode(e.to_string()))?;
PgEncoder::encode_execute_to(&mut self.write_buf);
PgEncoder::encode_sync_to(&mut self.write_buf);
self.flush_write_buf().await?;
let mut row_count = 0usize;
let mut row_buf: Vec<Option<Vec<u8>>> = Vec::new();
let mut error: Option<PgError> = None;
let mut flow = ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_execute(false));
loop {
match self.recv_fill_data_row_fast(&mut row_buf).await {
Ok(msg_type) => {
flow.validate_msg_type(
msg_type,
"prepared single reuse visit execute",
error.is_some(),
)?;
match msg_type {
b'2' | b'T' | b'n' => {}
b'D' => {
if error.is_none() {
on_row(row_buf.as_slice())?;
row_count += 1;
}
}
b'C' => {}
b'Z' => {
if let Some(err) = error {
return Err(err);
}
return Ok(row_count);
}
msg_type if is_ignorable_session_msg_type(msg_type) => {}
other => {
return Err(unexpected_backend_msg_type(
"prepared single reuse visit execute",
other,
));
}
}
}
Err(e) => {
if matches!(&e, PgError::QueryServer(_)) {
capture_query_server_error(self, &mut error, e);
continue;
}
return Err(e);
}
}
}
}
#[inline]
pub async fn query_prepared_single_reuse_visit_bytes_rows_with_result_format<F>(
&mut self,
stmt: &super::PreparedStatement,
params: &[Option<Vec<u8>>],
result_format: i16,
mut on_row: F,
) -> PgResult<usize>
where
F: FnMut(&super::PgBytesRow) -> PgResult<()>,
{
reserve_prepared_single_write_buf(self, stmt, params, result_format)?;
PgEncoder::encode_bind_to_with_result_format(
&mut self.write_buf,
&stmt.name,
params,
result_format,
)
.map_err(|e| PgError::Encode(e.to_string()))?;
PgEncoder::encode_execute_to(&mut self.write_buf);
PgEncoder::encode_sync_to(&mut self.write_buf);
self.flush_write_buf().await?;
let mut row_count = 0usize;
let mut row = super::PgBytesRow::default();
let mut error: Option<PgError> = None;
let mut flow = ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_execute(false));
loop {
match self.recv_fill_zerocopy_row_fast(&mut row).await {
Ok(msg_type) => {
flow.validate_msg_type(
msg_type,
"prepared single reuse visit bytes execute",
error.is_some(),
)?;
match msg_type {
b'2' | b'T' | b'n' => {}
b'D' => {
if error.is_none() {
on_row(&row)?;
row_count += 1;
row.release_payload();
}
}
b'C' => {}
b'Z' => {
if let Some(err) = error {
return Err(err);
}
return Ok(row_count);
}
msg_type if is_ignorable_session_msg_type(msg_type) => {}
other => {
return Err(unexpected_backend_msg_type(
"prepared single reuse visit bytes execute",
other,
));
}
}
}
Err(e) => {
if matches!(&e, PgError::QueryServer(_)) {
capture_query_server_error(self, &mut error, e);
continue;
}
return Err(e);
}
}
}
}
#[inline]
pub async fn query_prepared_single_reuse_visit_first_column_bytes_with_result_format<F>(
&mut self,
stmt: &super::PreparedStatement,
params: &[Option<Vec<u8>>],
result_format: i16,
mut on_value: F,
) -> PgResult<usize>
where
F: FnMut(Option<&[u8]>) -> PgResult<()>,
{
reserve_prepared_single_write_buf(self, stmt, params, result_format)?;
PgEncoder::encode_bind_to_with_result_format(
&mut self.write_buf,
&stmt.name,
params,
result_format,
)
.map_err(|e| PgError::Encode(e.to_string()))?;
PgEncoder::encode_execute_to(&mut self.write_buf);
PgEncoder::encode_sync_to(&mut self.write_buf);
self.flush_write_buf().await?;
let mut row_count = 0usize;
let mut first_column: Option<Bytes> = None;
let mut error: Option<PgError> = None;
let mut flow = ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_execute(false));
loop {
match self
.recv_fill_first_column_zerocopy_fast(&mut first_column)
.await
{
Ok(msg_type) => {
flow.validate_msg_type(
msg_type,
"prepared single reuse visit first-column execute",
error.is_some(),
)?;
match msg_type {
b'2' | b'T' | b'n' => {}
b'D' => {
if error.is_none() {
on_value(first_column.as_deref())?;
row_count += 1;
first_column = None;
}
}
b'C' => {}
b'Z' => {
if let Some(err) = error {
return Err(err);
}
return Ok(row_count);
}
msg_type if is_ignorable_session_msg_type(msg_type) => {}
other => {
return Err(unexpected_backend_msg_type(
"prepared single reuse visit first-column execute",
other,
));
}
}
}
Err(e) => {
if matches!(&e, PgError::QueryServer(_)) {
capture_query_server_error(self, &mut error, e);
continue;
}
return Err(e);
}
}
}
}
#[inline]
pub async fn query_prepared_single_reuse_visit_first_four_columns_bytes_with_result_format<F>(
&mut self,
stmt: &super::PreparedStatement,
params: &[Option<Vec<u8>>],
result_format: i16,
mut on_row: F,
) -> PgResult<usize>
where
F: FnMut([Option<&[u8]>; 4]) -> PgResult<()>,
{
reserve_prepared_single_write_buf(self, stmt, params, result_format)?;
PgEncoder::encode_bind_to_with_result_format(
&mut self.write_buf,
&stmt.name,
params,
result_format,
)
.map_err(|e| PgError::Encode(e.to_string()))?;
PgEncoder::encode_execute_to(&mut self.write_buf);
PgEncoder::encode_sync_to(&mut self.write_buf);
self.flush_write_buf().await?;
let mut row_count = 0usize;
let mut columns = [None, None, None, None];
let mut error: Option<PgError> = None;
let mut flow = ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_execute(false));
loop {
match self
.recv_fill_first_four_columns_zerocopy_fast(&mut columns)
.await
{
Ok(msg_type) => {
flow.validate_msg_type(
msg_type,
"prepared single reuse visit first-four execute",
error.is_some(),
)?;
match msg_type {
b'2' | b'T' | b'n' => {}
b'D' => {
if error.is_none() {
on_row([
columns[0].as_deref(),
columns[1].as_deref(),
columns[2].as_deref(),
columns[3].as_deref(),
])?;
columns.fill(None);
row_count += 1;
}
}
b'C' => {}
b'Z' => {
if let Some(err) = error {
return Err(err);
}
return Ok(row_count);
}
msg_type if is_ignorable_session_msg_type(msg_type) => {}
other => {
return Err(unexpected_backend_msg_type(
"prepared single reuse visit first-four execute",
other,
));
}
}
}
Err(e) => {
if matches!(&e, PgError::QueryServer(_)) {
capture_query_server_error(self, &mut error, e);
continue;
}
return Err(e);
}
}
}
}
}