use super::connection::PooledConnection;
use super::lifecycle::MAX_HOT_STATEMENTS;
use crate::driver::{
PgConnection, PgError, PgResult, ResultFormat,
extended_flow::{ExtendedFlowConfig, ExtendedFlowTracker},
is_ignorable_session_message, unexpected_backend_message,
};
use std::sync::Arc;
#[inline]
fn rollback_cache_miss_statement_registration(
conn: &mut PgConnection,
is_cache_miss: bool,
sql_hash: u64,
stmt_name: &str,
) {
if is_cache_miss {
conn.stmt_cache.remove(&sql_hash);
conn.prepared_statements.remove(stmt_name);
conn.column_info_cache.remove(&sql_hash);
}
}
async fn drain_extended_responses_after_rls_setup_error(conn: &mut PgConnection) -> PgResult<()> {
loop {
let msg = conn.recv().await?;
match msg {
crate::protocol::BackendMessage::ReadyForQuery(_) => return Ok(()),
crate::protocol::BackendMessage::ErrorResponse(_) => {}
msg if is_ignorable_session_message(&msg) => {}
_ => {}
}
}
}
impl PooledConnection {
pub async fn fetch_all_uncached(
&mut self,
cmd: &qail_core::ast::Qail,
) -> PgResult<Vec<crate::driver::PgRow>> {
self.fetch_all_uncached_with_format(cmd, ResultFormat::Text)
.await
}
pub async fn query_raw_with_params(
&mut self,
sql: &str,
params: &[Option<Vec<u8>>],
) -> PgResult<Vec<Vec<Option<Vec<u8>>>>> {
let conn = self.conn_mut()?;
conn.query(sql, params).await
}
pub async fn query_rows_with_params(
&mut self,
sql: &str,
params: &[Option<Vec<u8>>],
) -> PgResult<Vec<crate::driver::PgRow>> {
self.query_rows_with_params_with_format(sql, params, ResultFormat::Text)
.await
}
pub async fn query_rows_with_params_with_format(
&mut self,
sql: &str,
params: &[Option<Vec<u8>>],
result_format: ResultFormat,
) -> PgResult<Vec<crate::driver::PgRow>> {
let conn = self.conn_mut()?;
conn.query_rows_with_result_format(sql, params, result_format.as_wire_code())
.await
}
pub async fn query_rows_with_param_types_with_format(
&mut self,
sql: &str,
param_types: &[u32],
params: &[Option<Vec<u8>>],
result_format: ResultFormat,
) -> PgResult<Vec<crate::driver::PgRow>> {
let conn = self.conn_mut()?;
conn.query_rows_with_param_types_and_result_format(
sql,
param_types,
params,
result_format.as_wire_code(),
)
.await
}
pub async fn probe_query_with_param_types(
&mut self,
sql: &str,
param_types: &[u32],
params: &[Option<Vec<u8>>],
) -> PgResult<()> {
let conn = self.conn_mut()?;
conn.probe_query_with_param_types(sql, param_types, params)
.await
}
pub async fn copy_export(&mut self, cmd: &qail_core::ast::Qail) -> PgResult<Vec<Vec<String>>> {
self.conn_mut()?.copy_export(cmd).await
}
pub async fn copy_export_stream_raw<F, Fut>(
&mut self,
cmd: &qail_core::ast::Qail,
on_chunk: F,
) -> PgResult<()>
where
F: FnMut(Vec<u8>) -> Fut,
Fut: std::future::Future<Output = PgResult<()>>,
{
self.conn_mut()?.copy_export_stream_raw(cmd, on_chunk).await
}
pub async fn copy_export_stream_rows<F>(
&mut self,
cmd: &qail_core::ast::Qail,
on_row: F,
) -> PgResult<()>
where
F: FnMut(Vec<String>) -> PgResult<()>,
{
self.conn_mut()?.copy_export_stream_rows(cmd, on_row).await
}
pub async fn copy_export_table(
&mut self,
table: &str,
columns: &[String],
) -> PgResult<Vec<u8>> {
let quote_ident = |ident: &str| -> String {
format!("\"{}\"", ident.replace('\0', "").replace('"', "\"\""))
};
let cols: Vec<String> = columns.iter().map(|c| quote_ident(c)).collect();
let sql = format!(
"COPY {} ({}) TO STDOUT",
quote_ident(table),
cols.join(", ")
);
self.conn_mut()?.copy_out_raw(&sql).await
}
pub async fn copy_export_table_stream<F, Fut>(
&mut self,
table: &str,
columns: &[String],
on_chunk: F,
) -> PgResult<()>
where
F: FnMut(Vec<u8>) -> Fut,
Fut: std::future::Future<Output = PgResult<()>>,
{
let quote_ident = |ident: &str| -> String {
format!("\"{}\"", ident.replace('\0', "").replace('"', "\"\""))
};
let cols: Vec<String> = columns.iter().map(|c| quote_ident(c)).collect();
let sql = format!(
"COPY {} ({}) TO STDOUT",
quote_ident(table),
cols.join(", ")
);
self.conn_mut()?.copy_out_raw_stream(&sql, on_chunk).await
}
pub async fn fetch_all_uncached_with_format(
&mut self,
cmd: &qail_core::ast::Qail,
result_format: ResultFormat,
) -> PgResult<Vec<crate::driver::PgRow>> {
use crate::driver::ColumnInfo;
use crate::protocol::AstEncoder;
let conn = self.conn_mut()?;
AstEncoder::encode_cmd_reuse_into_with_result_format(
cmd,
&mut conn.sql_buf,
&mut conn.params_buf,
&mut conn.write_buf,
result_format.as_wire_code(),
)
.map_err(|e| PgError::Encode(e.to_string()))?;
conn.flush_write_buf().await?;
let mut rows: Vec<crate::driver::PgRow> = Vec::new();
let mut column_info: Option<Arc<ColumnInfo>> = None;
let mut error: Option<PgError> = None;
let mut flow =
ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_describe_portal_execute());
loop {
let msg = conn.recv().await?;
flow.validate(&msg, "pool fetch_all execute", error.is_some())?;
match msg {
crate::protocol::BackendMessage::ParseComplete
| crate::protocol::BackendMessage::BindComplete => {}
crate::protocol::BackendMessage::RowDescription(fields) => {
column_info = Some(Arc::new(ColumnInfo::from_fields(&fields)));
}
crate::protocol::BackendMessage::DataRow(data) => {
if error.is_none() {
rows.push(crate::driver::PgRow {
columns: data,
column_info: column_info.clone(),
});
}
}
crate::protocol::BackendMessage::NoData => {}
crate::protocol::BackendMessage::CommandComplete(_) => {}
crate::protocol::BackendMessage::ReadyForQuery(_) => {
if let Some(err) = error {
return Err(err);
}
return Ok(rows);
}
crate::protocol::BackendMessage::ErrorResponse(err) => {
if error.is_none() {
error = Some(PgError::QueryServer(err.into()));
}
}
msg if is_ignorable_session_message(&msg) => {}
other => {
return Err(unexpected_backend_message("pool fetch_all execute", &other));
}
}
}
}
pub async fn fetch_all_fast(
&mut self,
cmd: &qail_core::ast::Qail,
) -> PgResult<Vec<crate::driver::PgRow>> {
self.fetch_all_fast_with_format(cmd, ResultFormat::Text)
.await
}
pub async fn fetch_all_fast_with_format(
&mut self,
cmd: &qail_core::ast::Qail,
result_format: ResultFormat,
) -> PgResult<Vec<crate::driver::PgRow>> {
use crate::protocol::AstEncoder;
let conn = self.conn_mut()?;
AstEncoder::encode_cmd_reuse_into_with_result_format(
cmd,
&mut conn.sql_buf,
&mut conn.params_buf,
&mut conn.write_buf,
result_format.as_wire_code(),
)
.map_err(|e| PgError::Encode(e.to_string()))?;
conn.flush_write_buf().await?;
let mut rows: Vec<crate::driver::PgRow> = Vec::with_capacity(32);
let mut error: Option<PgError> = None;
let mut flow = ExtendedFlowTracker::new(ExtendedFlowConfig::parse_bind_execute(true));
loop {
let res = conn.recv_with_data_fast().await;
match res {
Ok((msg_type, data)) => {
flow.validate_msg_type(
msg_type,
"pool fetch_all_fast execute",
error.is_some(),
)?;
match msg_type {
b'D' => {
if error.is_none()
&& let Some(columns) = data
{
rows.push(crate::driver::PgRow {
columns,
column_info: None,
});
}
}
b'Z' => {
if let Some(err) = error {
return Err(err);
}
return Ok(rows);
}
_ => {}
}
}
Err(e) => {
if matches!(&e, PgError::QueryServer(_)) {
if error.is_none() {
error = Some(e);
}
continue;
}
return Err(e);
}
}
}
}
pub async fn fetch_all_cached(
&mut self,
cmd: &qail_core::ast::Qail,
) -> PgResult<Vec<crate::driver::PgRow>> {
self.fetch_all_cached_with_format(cmd, ResultFormat::Text)
.await
}
pub async fn fetch_all_cached_with_format(
&mut self,
cmd: &qail_core::ast::Qail,
result_format: ResultFormat,
) -> PgResult<Vec<crate::driver::PgRow>> {
let mut retried = false;
loop {
match self
.fetch_all_cached_with_format_once(cmd, result_format)
.await
{
Ok(rows) => return Ok(rows),
Err(err)
if !retried
&& (err.is_prepared_statement_retryable()
|| err.is_prepared_statement_already_exists()) =>
{
retried = true;
if err.is_prepared_statement_retryable()
&& let Some(conn) = self.conn.as_mut()
{
conn.clear_prepared_statement_state();
}
}
Err(err) => return Err(err),
}
}
}
pub async fn fetch_typed<T: crate::driver::row::QailRow>(
&mut self,
cmd: &qail_core::ast::Qail,
) -> PgResult<Vec<T>> {
self.fetch_typed_with_format(cmd, ResultFormat::Text).await
}
pub async fn fetch_typed_with_format<T: crate::driver::row::QailRow>(
&mut self,
cmd: &qail_core::ast::Qail,
result_format: ResultFormat,
) -> PgResult<Vec<T>> {
let rows = self
.fetch_all_cached_with_format(cmd, result_format)
.await?;
Ok(rows.iter().map(T::from_row).collect())
}
pub async fn fetch_one_typed<T: crate::driver::row::QailRow>(
&mut self,
cmd: &qail_core::ast::Qail,
) -> PgResult<Option<T>> {
self.fetch_one_typed_with_format(cmd, ResultFormat::Text)
.await
}
pub async fn fetch_one_typed_with_format<T: crate::driver::row::QailRow>(
&mut self,
cmd: &qail_core::ast::Qail,
result_format: ResultFormat,
) -> PgResult<Option<T>> {
let rows = self
.fetch_all_cached_with_format(cmd, result_format)
.await?;
Ok(rows.first().map(T::from_row))
}
async fn fetch_all_cached_with_format_once(
&mut self,
cmd: &qail_core::ast::Qail,
result_format: ResultFormat,
) -> PgResult<Vec<crate::driver::PgRow>> {
use crate::driver::ColumnInfo;
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let conn = self.conn.as_mut().ok_or_else(|| {
PgError::Connection("Connection already released back to pool".into())
})?;
conn.sql_buf.clear();
conn.params_buf.clear();
match cmd.action {
qail_core::ast::Action::Get | qail_core::ast::Action::With => {
crate::protocol::ast_encoder::dml::encode_select(
cmd,
&mut conn.sql_buf,
&mut conn.params_buf,
)?;
}
qail_core::ast::Action::Add => {
crate::protocol::ast_encoder::dml::encode_insert(
cmd,
&mut conn.sql_buf,
&mut conn.params_buf,
)?;
}
qail_core::ast::Action::Set => {
crate::protocol::ast_encoder::dml::encode_update(
cmd,
&mut conn.sql_buf,
&mut conn.params_buf,
)?;
}
qail_core::ast::Action::Del => {
crate::protocol::ast_encoder::dml::encode_delete(
cmd,
&mut conn.sql_buf,
&mut conn.params_buf,
)?;
}
_ => {
return self
.fetch_all_uncached_with_format(cmd, result_format)
.await;
}
}
let mut hasher = DefaultHasher::new();
conn.sql_buf.hash(&mut hasher);
let sql_hash = hasher.finish();
let is_cache_miss = !conn.stmt_cache.contains(&sql_hash);
conn.write_buf.clear();
let stmt_name = if let Some(name) = conn.stmt_cache.get(&sql_hash) {
name
} else {
let name = format!("qail_{:x}", sql_hash);
conn.evict_prepared_if_full();
let sql_str = std::str::from_utf8(&conn.sql_buf).unwrap_or("");
use crate::protocol::PgEncoder;
let parse_msg = PgEncoder::try_encode_parse(&name, sql_str, &[])?;
let describe_msg = PgEncoder::try_encode_describe(false, &name)?;
conn.write_buf.extend_from_slice(&parse_msg);
conn.write_buf.extend_from_slice(&describe_msg);
conn.stmt_cache.put(sql_hash, name.clone());
conn.prepared_statements
.insert(name.clone(), sql_str.to_string());
if let Ok(mut hot) = self.pool.hot_statements.write()
&& hot.len() < MAX_HOT_STATEMENTS
{
hot.insert(sql_hash, (name.clone(), sql_str.to_string()));
}
name
};
use crate::protocol::PgEncoder;
if let Err(e) = PgEncoder::encode_bind_to_with_result_format(
&mut conn.write_buf,
&stmt_name,
&conn.params_buf,
result_format.as_wire_code(),
) {
if is_cache_miss {
conn.stmt_cache.remove(&sql_hash);
conn.prepared_statements.remove(&stmt_name);
conn.column_info_cache.remove(&sql_hash);
}
return Err(PgError::Encode(e.to_string()));
}
PgEncoder::encode_execute_to(&mut conn.write_buf);
PgEncoder::encode_sync_to(&mut conn.write_buf);
if let Err(err) = conn.flush_write_buf().await {
if is_cache_miss {
conn.stmt_cache.remove(&sql_hash);
conn.prepared_statements.remove(&stmt_name);
conn.column_info_cache.remove(&sql_hash);
}
return Err(err);
}
let cached_column_info = conn.column_info_cache.get(&sql_hash).cloned();
let mut rows: Vec<crate::driver::PgRow> = Vec::with_capacity(32);
let mut column_info: Option<Arc<ColumnInfo>> = cached_column_info;
let mut error: Option<PgError> = None;
let mut flow = ExtendedFlowTracker::new(
ExtendedFlowConfig::parse_describe_statement_bind_execute(is_cache_miss),
);
loop {
let msg = match conn.recv().await {
Ok(msg) => msg,
Err(err) => {
if is_cache_miss && !flow.saw_parse_complete() {
conn.stmt_cache.remove(&sql_hash);
conn.prepared_statements.remove(&stmt_name);
conn.column_info_cache.remove(&sql_hash);
}
return Err(err);
}
};
if let Err(err) = flow.validate(&msg, "pool fetch_all_cached execute", error.is_some())
{
if is_cache_miss && !flow.saw_parse_complete() {
conn.stmt_cache.remove(&sql_hash);
conn.prepared_statements.remove(&stmt_name);
conn.column_info_cache.remove(&sql_hash);
}
return Err(err);
}
match msg {
crate::protocol::BackendMessage::ParseComplete => {}
crate::protocol::BackendMessage::BindComplete => {}
crate::protocol::BackendMessage::ParameterDescription(_) => {}
crate::protocol::BackendMessage::RowDescription(fields) => {
let info = Arc::new(ColumnInfo::from_fields(&fields));
if is_cache_miss {
conn.column_info_cache.insert(sql_hash, info.clone());
}
column_info = Some(info);
}
crate::protocol::BackendMessage::DataRow(data) => {
if error.is_none() {
rows.push(crate::driver::PgRow {
columns: data,
column_info: column_info.clone(),
});
}
}
crate::protocol::BackendMessage::CommandComplete(_) => {}
crate::protocol::BackendMessage::ReadyForQuery(_) => {
if let Some(err) = error {
if is_cache_miss
&& !flow.saw_parse_complete()
&& !err.is_prepared_statement_already_exists()
{
conn.stmt_cache.remove(&sql_hash);
conn.prepared_statements.remove(&stmt_name);
conn.column_info_cache.remove(&sql_hash);
}
return Err(err);
}
if is_cache_miss && !flow.saw_parse_complete() {
conn.stmt_cache.remove(&sql_hash);
conn.prepared_statements.remove(&stmt_name);
conn.column_info_cache.remove(&sql_hash);
return Err(PgError::Protocol(
"Cache miss query reached ReadyForQuery without ParseComplete"
.to_string(),
));
}
return Ok(rows);
}
crate::protocol::BackendMessage::ErrorResponse(err) => {
if error.is_none() {
error = Some(PgError::QueryServer(err.into()));
}
}
msg if is_ignorable_session_message(&msg) => {}
other => {
if is_cache_miss && !flow.saw_parse_complete() {
conn.stmt_cache.remove(&sql_hash);
conn.prepared_statements.remove(&stmt_name);
conn.column_info_cache.remove(&sql_hash);
}
return Err(unexpected_backend_message(
"pool fetch_all_cached execute",
&other,
));
}
}
}
}
pub async fn fetch_all_with_rls(
&mut self,
cmd: &qail_core::ast::Qail,
rls_sql: &str,
) -> PgResult<Vec<crate::driver::PgRow>> {
self.fetch_all_with_rls_with_format(cmd, rls_sql, ResultFormat::Text)
.await
}
pub async fn fetch_all_with_rls_with_format(
&mut self,
cmd: &qail_core::ast::Qail,
rls_sql: &str,
result_format: ResultFormat,
) -> PgResult<Vec<crate::driver::PgRow>> {
let mut retried = false;
loop {
match self
.fetch_all_with_rls_with_format_once(cmd, rls_sql, result_format)
.await
{
Ok(rows) => return Ok(rows),
Err(err)
if !retried
&& (err.is_prepared_statement_retryable()
|| err.is_prepared_statement_already_exists()) =>
{
retried = true;
if let Some(conn) = self.conn.as_mut() {
if err.is_prepared_statement_retryable() {
conn.clear_prepared_statement_state();
}
let _ = conn.execute_simple("ROLLBACK").await;
}
self.rls_dirty = false;
}
Err(err) => return Err(err),
}
}
}
async fn fetch_all_with_rls_with_format_once(
&mut self,
cmd: &qail_core::ast::Qail,
rls_sql: &str,
result_format: ResultFormat,
) -> PgResult<Vec<crate::driver::PgRow>> {
use crate::driver::ColumnInfo;
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let conn = self.conn.as_mut().ok_or_else(|| {
PgError::Connection("Connection already released back to pool".into())
})?;
conn.sql_buf.clear();
conn.params_buf.clear();
match cmd.action {
qail_core::ast::Action::Get | qail_core::ast::Action::With => {
crate::protocol::ast_encoder::dml::encode_select(
cmd,
&mut conn.sql_buf,
&mut conn.params_buf,
)?;
}
qail_core::ast::Action::Add => {
crate::protocol::ast_encoder::dml::encode_insert(
cmd,
&mut conn.sql_buf,
&mut conn.params_buf,
)?;
}
qail_core::ast::Action::Set => {
crate::protocol::ast_encoder::dml::encode_update(
cmd,
&mut conn.sql_buf,
&mut conn.params_buf,
)?;
}
qail_core::ast::Action::Del => {
crate::protocol::ast_encoder::dml::encode_delete(
cmd,
&mut conn.sql_buf,
&mut conn.params_buf,
)?;
}
_ => {
conn.execute_simple(rls_sql).await?;
self.rls_dirty = true;
return self
.fetch_all_uncached_with_format(cmd, result_format)
.await;
}
}
let mut hasher = DefaultHasher::new();
conn.sql_buf.hash(&mut hasher);
let sql_hash = hasher.finish();
let is_cache_miss = !conn.stmt_cache.contains(&sql_hash);
conn.write_buf.clear();
let rls_msg = crate::protocol::PgEncoder::try_encode_query_string(rls_sql)?;
conn.write_buf.extend_from_slice(&rls_msg);
let stmt_name = if let Some(name) = conn.stmt_cache.get(&sql_hash) {
name
} else {
let name = format!("qail_{:x}", sql_hash);
conn.evict_prepared_if_full();
let sql_str = std::str::from_utf8(&conn.sql_buf).unwrap_or("");
use crate::protocol::PgEncoder;
let parse_msg = PgEncoder::try_encode_parse(&name, sql_str, &[])?;
let describe_msg = PgEncoder::try_encode_describe(false, &name)?;
conn.write_buf.extend_from_slice(&parse_msg);
conn.write_buf.extend_from_slice(&describe_msg);
conn.stmt_cache.put(sql_hash, name.clone());
conn.prepared_statements
.insert(name.clone(), sql_str.to_string());
if let Ok(mut hot) = self.pool.hot_statements.write()
&& hot.len() < MAX_HOT_STATEMENTS
{
hot.insert(sql_hash, (name.clone(), sql_str.to_string()));
}
name
};
use crate::protocol::PgEncoder;
if let Err(e) = PgEncoder::encode_bind_to_with_result_format(
&mut conn.write_buf,
&stmt_name,
&conn.params_buf,
result_format.as_wire_code(),
) {
rollback_cache_miss_statement_registration(conn, is_cache_miss, sql_hash, &stmt_name);
return Err(PgError::Encode(e.to_string()));
}
PgEncoder::encode_execute_to(&mut conn.write_buf);
PgEncoder::encode_sync_to(&mut conn.write_buf);
if let Err(err) = conn.flush_write_buf().await {
rollback_cache_miss_statement_registration(conn, is_cache_miss, sql_hash, &stmt_name);
return Err(err);
}
self.rls_dirty = true;
let mut rls_error: Option<PgError> = None;
loop {
let msg = match conn.recv().await {
Ok(msg) => msg,
Err(err) => {
rollback_cache_miss_statement_registration(
conn,
is_cache_miss,
sql_hash,
&stmt_name,
);
return Err(err);
}
};
match msg {
crate::protocol::BackendMessage::ReadyForQuery(_) => {
if let Some(err) = rls_error {
rollback_cache_miss_statement_registration(
conn,
is_cache_miss,
sql_hash,
&stmt_name,
);
if let Err(drain_err) =
drain_extended_responses_after_rls_setup_error(conn).await
{
tracing::warn!(
error = %drain_err,
"failed to drain pipelined extended responses after RLS setup error"
);
}
return Err(err);
}
break;
}
crate::protocol::BackendMessage::ErrorResponse(err) => {
if rls_error.is_none() {
rls_error = Some(PgError::QueryServer(err.into()));
}
}
crate::protocol::BackendMessage::CommandComplete(_)
| crate::protocol::BackendMessage::DataRow(_)
| crate::protocol::BackendMessage::RowDescription(_)
| crate::protocol::BackendMessage::ParseComplete
| crate::protocol::BackendMessage::BindComplete => {}
msg if is_ignorable_session_message(&msg) => {}
other => return Err(unexpected_backend_message("pool rls setup", &other)),
}
}
let cached_column_info = conn.column_info_cache.get(&sql_hash).cloned();
let mut rows: Vec<crate::driver::PgRow> = Vec::with_capacity(32);
let mut column_info: Option<std::sync::Arc<ColumnInfo>> = cached_column_info;
let mut error: Option<PgError> = None;
let mut flow = ExtendedFlowTracker::new(
ExtendedFlowConfig::parse_describe_statement_bind_execute(is_cache_miss),
);
loop {
let msg = match conn.recv().await {
Ok(msg) => msg,
Err(err) => {
if is_cache_miss && !flow.saw_parse_complete() {
rollback_cache_miss_statement_registration(
conn,
is_cache_miss,
sql_hash,
&stmt_name,
);
}
return Err(err);
}
};
if let Err(err) =
flow.validate(&msg, "pool fetch_all_with_rls execute", error.is_some())
{
if is_cache_miss && !flow.saw_parse_complete() {
rollback_cache_miss_statement_registration(
conn,
is_cache_miss,
sql_hash,
&stmt_name,
);
}
return Err(err);
}
match msg {
crate::protocol::BackendMessage::ParseComplete => {}
crate::protocol::BackendMessage::BindComplete => {}
crate::protocol::BackendMessage::ParameterDescription(_) => {}
crate::protocol::BackendMessage::RowDescription(fields) => {
let info = std::sync::Arc::new(ColumnInfo::from_fields(&fields));
if is_cache_miss {
conn.column_info_cache.insert(sql_hash, info.clone());
}
column_info = Some(info);
}
crate::protocol::BackendMessage::DataRow(data) => {
if error.is_none() {
rows.push(crate::driver::PgRow {
columns: data,
column_info: column_info.clone(),
});
}
}
crate::protocol::BackendMessage::CommandComplete(_) => {}
crate::protocol::BackendMessage::ReadyForQuery(_) => {
if let Some(err) = error {
if is_cache_miss
&& !flow.saw_parse_complete()
&& !err.is_prepared_statement_already_exists()
{
rollback_cache_miss_statement_registration(
conn,
is_cache_miss,
sql_hash,
&stmt_name,
);
}
return Err(err);
}
if is_cache_miss && !flow.saw_parse_complete() {
rollback_cache_miss_statement_registration(
conn,
is_cache_miss,
sql_hash,
&stmt_name,
);
return Err(PgError::Protocol(
"Cache miss query reached ReadyForQuery without ParseComplete"
.to_string(),
));
}
return Ok(rows);
}
crate::protocol::BackendMessage::ErrorResponse(err) => {
if error.is_none() {
error = Some(PgError::QueryServer(err.into()));
}
}
msg if is_ignorable_session_message(&msg) => {}
other => {
if is_cache_miss && !flow.saw_parse_complete() {
rollback_cache_miss_statement_registration(
conn,
is_cache_miss,
sql_hash,
&stmt_name,
);
}
return Err(unexpected_backend_message(
"pool fetch_all_with_rls execute",
&other,
));
}
}
}
}
}