pub mod binary;
pub mod text;
use crate::connection::stream::PgConnection;
use crate::error::{Error, Result};
use crate::protocol::backend::{BackendMessage, CopyFormat};
use crate::protocol::frontend;
use crate::row::parse_command_tag;
pub struct CopyIn<'a> {
conn: &'a mut PgConnection,
format: CopyFormat,
column_count: usize,
finished: bool,
}
impl<'a> CopyIn<'a> {
pub(crate) fn new(conn: &'a mut PgConnection, format: CopyFormat, column_count: usize) -> Self {
Self {
conn,
format,
column_count,
finished: false,
}
}
pub fn format(&self) -> CopyFormat {
self.format
}
pub fn column_count(&self) -> usize {
self.column_count
}
pub async fn write_raw(&mut self, data: &[u8]) -> Result<()> {
frontend::copy_data(self.conn.write_buf(), data);
self.conn.send().await
}
pub async fn finish(mut self) -> Result<u64> {
self.finished = true;
frontend::copy_done(self.conn.write_buf());
self.conn.send().await?;
let rows = loop {
match self.conn.recv().await? {
BackendMessage::CommandComplete { tag } => {
break parse_command_tag(&tag).rows_affected;
}
BackendMessage::ErrorResponse { fields } => {
drain_until_ready(self.conn).await.ok();
return Err(Error::server(
fields.severity,
fields.code,
fields.message,
fields.detail,
fields.hint,
fields.position,
));
}
_ => {}
}
};
drain_until_ready(self.conn).await?;
Ok(rows)
}
pub async fn abort(mut self, message: &str) -> Result<()> {
self.finished = true;
frontend::copy_fail(self.conn.write_buf(), message);
self.conn.send().await?;
drain_until_ready(self.conn).await.ok();
Ok(())
}
}
impl Drop for CopyIn<'_> {
fn drop(&mut self) {
if !self.finished {
frontend::copy_fail(
self.conn.write_buf(),
"COPY IN aborted: dropped without finish",
);
}
}
}
pub struct CopyOut<'a> {
conn: &'a mut PgConnection,
format: CopyFormat,
done: bool,
}
impl<'a> CopyOut<'a> {
pub(crate) fn new(conn: &'a mut PgConnection, format: CopyFormat) -> Self {
Self {
conn,
format,
done: false,
}
}
pub fn format(&self) -> CopyFormat {
self.format
}
pub async fn read_raw(&mut self) -> Result<Option<bytes::Bytes>> {
if self.done {
return Ok(None);
}
loop {
match self.conn.recv().await? {
BackendMessage::CopyData { data } => {
return Ok(Some(data));
}
BackendMessage::CopyDone => {
self.done = true;
drain_until_ready(self.conn).await?;
return Ok(None);
}
BackendMessage::ErrorResponse { fields } => {
self.done = true;
drain_until_ready(self.conn).await.ok();
return Err(Error::server(
fields.severity,
fields.code,
fields.message,
fields.detail,
fields.hint,
fields.position,
));
}
_ => {}
}
}
}
}
pub(crate) async fn start_copy_in(
conn: &mut PgConnection,
sql: &str,
) -> Result<(CopyFormat, usize)> {
frontend::query(conn.write_buf(), sql);
conn.send().await?;
loop {
match conn.recv().await? {
BackendMessage::CopyInResponse {
format,
column_formats,
} => {
return Ok((format, column_formats.len()));
}
BackendMessage::ErrorResponse { fields } => {
drain_until_ready(conn).await.ok();
return Err(Error::server(
fields.severity,
fields.code,
fields.message,
fields.detail,
fields.hint,
fields.position,
));
}
_ => {}
}
}
}
pub(crate) async fn start_copy_out(conn: &mut PgConnection, sql: &str) -> Result<CopyFormat> {
frontend::query(conn.write_buf(), sql);
conn.send().await?;
loop {
match conn.recv().await? {
BackendMessage::CopyOutResponse { format, .. } => {
return Ok(format);
}
BackendMessage::ErrorResponse { fields } => {
drain_until_ready(conn).await.ok();
return Err(Error::server(
fields.severity,
fields.code,
fields.message,
fields.detail,
fields.hint,
fields.position,
));
}
_ => {}
}
}
}
async fn drain_until_ready(conn: &mut PgConnection) -> Result<()> {
loop {
if let BackendMessage::ReadyForQuery { .. } = conn.recv().await? {
return Ok(());
}
}
}