Skip to main content

pgwire/api/
results.rs

1use std::fmt::Debug;
2use std::pin::Pin;
3use std::sync::{Arc, LazyLock};
4
5use bytes::{BufMut, Bytes, BytesMut};
6use futures::{Stream, StreamExt, future, stream};
7use postgres_types::{IsNull, Oid, ToSql, Type};
8
9use crate::error::{ErrorInfo, PgWireError, PgWireResult};
10use crate::messages::copy::CopyData;
11use crate::messages::data::{
12    DataRow, FORMAT_CODE_BINARY, FORMAT_CODE_TEXT, FieldDescription, RowDescription,
13};
14use crate::messages::response::CommandComplete;
15use crate::types::ToSqlText;
16use crate::types::format::FormatOptions;
17use smol_str::SmolStr;
18
19#[derive(Debug, Eq, PartialEq, Clone)]
20pub struct Tag {
21    command: String,
22    oid: Option<Oid>,
23    rows: Option<usize>,
24}
25
26impl Tag {
27    pub fn new(command: &str) -> Tag {
28        Tag {
29            command: command.to_owned(),
30            oid: None,
31            rows: None,
32        }
33    }
34
35    pub fn with_rows(mut self, rows: usize) -> Tag {
36        self.rows = Some(rows);
37        self
38    }
39
40    pub fn with_oid(mut self, oid: Oid) -> Tag {
41        self.oid = Some(oid);
42        self
43    }
44}
45
46impl From<Tag> for CommandComplete {
47    fn from(tag: Tag) -> CommandComplete {
48        let tag_string = if let (Some(oid), Some(rows)) = (tag.oid, tag.rows) {
49            format!("{} {oid} {rows}", tag.command)
50        } else if let Some(rows) = tag.rows {
51            format!("{} {rows}", tag.command)
52        } else {
53            tag.command
54        };
55        CommandComplete::new(tag_string)
56    }
57}
58
59/// Describe encoding of a data field.
60#[derive(Debug, Eq, PartialEq, Clone, Copy)]
61pub enum FieldFormat {
62    Text,
63    Binary,
64}
65
66impl FieldFormat {
67    /// Get format code for the encoding.
68    pub fn value(&self) -> i16 {
69        match self {
70            Self::Text => FORMAT_CODE_TEXT,
71            Self::Binary => FORMAT_CODE_BINARY,
72        }
73    }
74
75    /// Parse FieldFormat from format code.
76    ///
77    /// 0 for text format, 1 for binary format. If the input is neither 0 nor 1,
78    /// here we return text as default value.
79    pub fn from(code: i16) -> Self {
80        if code == FORMAT_CODE_BINARY {
81            FieldFormat::Binary
82        } else {
83            FieldFormat::Text
84        }
85    }
86}
87
88/// Options for COPY text format.
89#[derive(Debug, Clone, Eq, PartialEq)]
90pub struct CopyTextOptions {
91    pub delimiter: SmolStr,
92    pub null_string: SmolStr,
93}
94
95impl Default for CopyTextOptions {
96    fn default() -> Self {
97        Self {
98            delimiter: "\t".into(),
99            null_string: "\\N".into(),
100        }
101    }
102}
103
104/// Options for COPY CSV format.
105#[derive(Debug, Clone, Eq, PartialEq)]
106pub struct CopyCsvOptions {
107    pub delimiter: SmolStr,
108    pub quote: SmolStr,
109    pub escape: SmolStr,
110    pub null_string: SmolStr,
111    pub force_quote: Vec<usize>,
112}
113
114impl Default for CopyCsvOptions {
115    fn default() -> Self {
116        Self {
117            delimiter: ",".into(),
118            quote: "\"".into(),
119            escape: "\"".into(),
120            null_string: "".into(),
121            force_quote: vec![],
122        }
123    }
124}
125
126// Default format options that are cloned in `FieldInfo::new` to avoid `Arc` allocation.
127//
128// Using thread-local storage avoids contention when multiple threads concurrently
129// clone the same `Arc<FormatOptions>` in `DataRowEncoder::encode_field`. Each thread
130// now clones its own thread-local instance rather than contending for a shared
131// global instance.
132//
133// This can be made a regular static if we remove format options cloning from
134// `DataRowEncoder::encode_field`.
135//
136// The issue with contention was observed in `examples/bench` benchmark:
137// https://github.com/sunng87/pgwire/pull/366#discussion_r2621917771
138thread_local! {
139    static DEFAULT_FORMAT_OPTIONS: LazyLock<Arc<FormatOptions>> = LazyLock::new(Default::default);
140}
141
142#[derive(Debug, new, Eq, PartialEq, Clone)]
143pub struct FieldInfo {
144    name: String,
145    table_id: Option<i32>,
146    column_id: Option<i16>,
147    datatype: Type,
148    format: FieldFormat,
149    #[new(value = "DEFAULT_FORMAT_OPTIONS.with(|opts| Arc::clone(&*opts))")]
150    format_options: Arc<FormatOptions>,
151}
152
153impl FieldInfo {
154    pub fn name(&self) -> &str {
155        &self.name
156    }
157
158    pub fn table_id(&self) -> Option<i32> {
159        self.table_id
160    }
161
162    pub fn column_id(&self) -> Option<i16> {
163        self.column_id
164    }
165
166    pub fn datatype(&self) -> &Type {
167        &self.datatype
168    }
169
170    pub fn format(&self) -> FieldFormat {
171        self.format
172    }
173
174    pub fn format_options(&self) -> &Arc<FormatOptions> {
175        &self.format_options
176    }
177
178    pub fn with_format_options(mut self, format_options: Arc<FormatOptions>) -> Self {
179        self.format_options = format_options;
180        self
181    }
182}
183
184impl From<&FieldInfo> for FieldDescription {
185    fn from(fi: &FieldInfo) -> Self {
186        FieldDescription::new(
187            fi.name.clone(),           // name
188            fi.table_id.unwrap_or(0),  // table_id
189            fi.column_id.unwrap_or(0), // column_id
190            fi.datatype.oid(),         // type_id
191            // TODO: type size and modifier
192            0,
193            0,
194            fi.format.value(),
195        )
196    }
197}
198
199impl From<FieldDescription> for FieldInfo {
200    fn from(value: FieldDescription) -> Self {
201        FieldInfo::new(
202            value.name,
203            Some(value.table_id),
204            Some(value.column_id),
205            Type::from_oid(value.type_id).unwrap_or(Type::UNKNOWN),
206            FieldFormat::from(value.format_code),
207        )
208    }
209}
210
211pub(crate) fn into_row_description(fields: &[FieldInfo]) -> RowDescription {
212    RowDescription::new(fields.iter().map(Into::into).collect())
213}
214
215pub type SendableRowStream = Pin<Box<dyn Stream<Item = PgWireResult<DataRow>> + Send>>;
216
217pub type SendableCopyDataStream = Pin<Box<dyn Stream<Item = PgWireResult<CopyData>> + Send>>;
218
219#[non_exhaustive]
220pub struct QueryResponse {
221    pub command_tag: String,
222    pub row_schema: Arc<Vec<FieldInfo>>,
223    pub data_rows: SendableRowStream,
224}
225
226impl Debug for QueryResponse {
227    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
228        f.debug_struct("QueryResponse")
229            .field("command_tag", &self.command_tag)
230            .field("row_schema", &self.row_schema)
231            .finish()
232    }
233}
234
235impl QueryResponse {
236    /// Create `QueryResponse` from column schemas and stream of data row.
237    /// Sets "SELECT" as the command tag.
238    pub fn new<S>(field_defs: Arc<Vec<FieldInfo>>, row_stream: S) -> QueryResponse
239    where
240        S: Stream<Item = PgWireResult<DataRow>> + Send + 'static,
241    {
242        QueryResponse {
243            command_tag: "SELECT".to_owned(),
244            row_schema: field_defs,
245            data_rows: Box::pin(row_stream),
246        }
247    }
248
249    /// Get the command tag
250    pub fn command_tag(&self) -> &str {
251        &self.command_tag
252    }
253
254    /// Set the command tag
255    pub fn set_command_tag(&mut self, command_tag: &str) {
256        command_tag.clone_into(&mut self.command_tag);
257    }
258
259    /// Get schema of columns
260    pub fn row_schema(&self) -> Arc<Vec<FieldInfo>> {
261        self.row_schema.clone()
262    }
263
264    /// Get access to data rows stream
265    pub fn data_rows(&mut self) -> &mut SendableRowStream {
266        &mut self.data_rows
267    }
268}
269
270pub struct DataRowEncoder {
271    schema: Arc<Vec<FieldInfo>>,
272    row_buffer: BytesMut,
273    col_index: usize,
274}
275
276impl DataRowEncoder {
277    /// New DataRowEncoder from schema of column
278    pub fn new(fields: Arc<Vec<FieldInfo>>) -> DataRowEncoder {
279        Self {
280            schema: fields,
281            row_buffer: BytesMut::with_capacity(128),
282            col_index: 0,
283        }
284    }
285
286    /// Encode value with custom type and format
287    ///
288    /// This encode function ignores data type and format information from
289    /// schema of this encoder.
290    pub fn encode_field_with_type_and_format<T>(
291        &mut self,
292        value: &T,
293        data_type: &Type,
294        format: FieldFormat,
295        format_options: &FormatOptions,
296    ) -> PgWireResult<()>
297    where
298        T: ToSql + ToSqlText + Sized,
299    {
300        // remember the position of the 4-byte length field
301        let prev_index = self.row_buffer.len();
302        // write value length as -1 ahead of time
303        self.row_buffer.put_i32(-1);
304
305        let is_null = if format == FieldFormat::Text {
306            value.to_sql_text(data_type, &mut self.row_buffer, format_options)?
307        } else {
308            value.to_sql(data_type, &mut self.row_buffer)?
309        };
310
311        if let IsNull::No = is_null {
312            let value_length = self.row_buffer.len() - prev_index - 4;
313            let mut length_bytes = &mut self.row_buffer[prev_index..(prev_index + 4)];
314            length_bytes.put_i32(value_length as i32);
315        }
316
317        self.col_index += 1;
318
319        Ok(())
320    }
321
322    /// Encode value using type and format, defined by schema
323    ///
324    /// Panic when encoding more columns than provided as schema.
325    pub fn encode_field<T>(&mut self, value: &T) -> PgWireResult<()>
326    where
327        T: ToSql + ToSqlText + Sized,
328    {
329        let field = &self.schema[self.col_index];
330
331        let data_type = field.datatype().clone();
332        let format = field.format();
333        let format_options = field.format_options().clone();
334
335        self.encode_field_with_type_and_format(value, &data_type, format, format_options.as_ref())
336    }
337
338    #[deprecated(
339        since = "0.37.0",
340        note = "DataRowEncoder is reusable since 0.37, use `take_row() instead`"
341    )]
342    pub fn finish(self) -> PgWireResult<DataRow> {
343        Ok(DataRow::new(self.row_buffer, self.col_index as i16))
344    }
345
346    /// Takes the current row from the encoder, resetting the encoder for reuse.
347    ///
348    /// This method splits the inner buffer, taking the current row data and leaving the
349    /// encoder with an empty buffer (but retaining the capacity) enabling buffer reuse.
350    pub fn take_row(&mut self) -> DataRow {
351        let row = DataRow::new(self.row_buffer.split(), self.col_index as i16);
352        self.col_index = 0;
353        row
354    }
355}
356
357/// Internal COPY format representation.
358#[derive(Debug, Clone, Eq, PartialEq)]
359enum CopyFormat {
360    Binary,
361    Text {
362        delimiter: SmolStr,
363        null_string: SmolStr,
364    },
365    Csv {
366        delimiter: SmolStr,
367        quote: SmolStr,
368        escape: SmolStr,
369        null_string: SmolStr,
370        force_quote: Vec<usize>,
371    },
372}
373
374/// Encoder for COPY operations.
375///
376/// This encoder produces CopyData messages for PGCOPY binary, text, and CSV formats.
377pub struct CopyEncoder {
378    schema: Arc<Vec<FieldInfo>>,
379    buffer: BytesMut,
380    format: CopyFormat,
381    col_index: usize,
382    header_written: bool,
383}
384
385impl CopyEncoder {
386    /// Create a new binary format COPY encoder.
387    pub fn new_binary(schema: Arc<Vec<FieldInfo>>) -> Self {
388        Self {
389            schema,
390            buffer: BytesMut::with_capacity(128),
391            format: CopyFormat::Binary,
392            col_index: 0,
393            header_written: false,
394        }
395    }
396
397    /// Create a new text format COPY encoder.
398    pub fn new_text(schema: Arc<Vec<FieldInfo>>, options: CopyTextOptions) -> Self {
399        Self {
400            schema,
401            buffer: BytesMut::with_capacity(128),
402            format: CopyFormat::Text {
403                delimiter: options.delimiter,
404                null_string: options.null_string,
405            },
406            col_index: 0,
407            header_written: false,
408        }
409    }
410
411    /// Create a new CSV format COPY encoder.
412    pub fn new_csv(schema: Arc<Vec<FieldInfo>>, options: CopyCsvOptions) -> Self {
413        Self {
414            schema,
415            buffer: BytesMut::with_capacity(128),
416            format: CopyFormat::Csv {
417                delimiter: options.delimiter,
418                quote: options.quote,
419                escape: options.escape,
420                null_string: options.null_string,
421                force_quote: options.force_quote,
422            },
423            col_index: 0,
424            header_written: false,
425        }
426    }
427
428    /// Encode a field value.
429    ///
430    /// This method uses the type and format information from the schema.
431    pub fn encode_field<T>(&mut self, value: &T) -> PgWireResult<()>
432    where
433        T: ToSql + ToSqlText + Sized,
434    {
435        let datatype = self.schema[self.col_index].datatype().clone();
436        let col_index = self.col_index;
437        let num_fields = self.schema.len();
438
439        match &self.format {
440            CopyFormat::Binary => self.encode_field_binary(value, &datatype)?,
441            CopyFormat::Text { .. } => {
442                let is_last = col_index == num_fields - 1;
443                self.encode_field_text(value, &datatype, is_last)?;
444            }
445            CopyFormat::Csv { .. } => {
446                let is_last = col_index == num_fields - 1;
447                self.encode_field_csv(value, &datatype, is_last)?;
448            }
449        }
450
451        self.col_index += 1;
452        Ok(())
453    }
454
455    /// Encode a field in binary format (same as DataRow encoding).
456    fn encode_field_binary<T>(&mut self, value: &T, datatype: &Type) -> PgWireResult<()>
457    where
458        T: ToSql + ToSqlText,
459    {
460        let prev_index = self.buffer.len();
461        self.buffer.put_i32(-1);
462
463        let is_null = value.to_sql(datatype, &mut self.buffer)?;
464
465        if let IsNull::No = is_null {
466            let value_length = self.buffer.len() - prev_index - 4;
467            let mut length_bytes = &mut self.buffer[prev_index..(prev_index + 4)];
468            length_bytes.put_i32(value_length as i32);
469        }
470
471        Ok(())
472    }
473
474    /// Encode a field in text format.
475    fn encode_field_text<T>(
476        &mut self,
477        value: &T,
478        datatype: &Type,
479        is_last: bool,
480    ) -> PgWireResult<()>
481    where
482        T: ToSqlText,
483    {
484        if let CopyFormat::Text {
485            delimiter,
486            null_string,
487        } = &self.format
488        {
489            let mut temp_buffer = BytesMut::new();
490            let is_null =
491                value.to_sql_text(datatype, &mut temp_buffer, &FormatOptions::default())?;
492
493            if let IsNull::Yes = is_null {
494                self.buffer.put_slice(null_string.as_bytes());
495            } else {
496                // Backslash escape special characters
497                for &byte in temp_buffer.as_ref() {
498                    match byte {
499                        b'\n' => {
500                            self.buffer.put_slice(b"\\n");
501                        }
502                        b'\r' => {
503                            self.buffer.put_slice(b"\\r");
504                        }
505                        b'\t' => {
506                            self.buffer.put_slice(b"\\t");
507                        }
508                        b'\\' => {
509                            self.buffer.put_slice(b"\\\\");
510                        }
511                        b if byte == delimiter.as_bytes()[0] => {
512                            self.buffer.put_u8(b'\\');
513                            self.buffer.put_u8(byte);
514                        }
515                        _ => {
516                            self.buffer.put_u8(byte);
517                        }
518                    }
519                }
520            }
521
522            // Add delimiter between fields
523            if !is_last {
524                self.buffer.put_slice(delimiter.as_bytes());
525            }
526
527            Ok(())
528        } else {
529            Err(PgWireError::IoError(std::io::Error::new(
530                std::io::ErrorKind::InvalidInput,
531                "Text format expected",
532            )))
533        }
534    }
535
536    /// Encode a field in CSV format.
537    fn encode_field_csv<T>(&mut self, value: &T, datatype: &Type, is_last: bool) -> PgWireResult<()>
538    where
539        T: ToSqlText,
540    {
541        if let CopyFormat::Csv {
542            delimiter,
543            quote,
544            null_string,
545            force_quote,
546            escape: _,
547        } = &self.format
548        {
549            let col_index = self.col_index;
550            let mut temp_buffer = BytesMut::new();
551            let is_null =
552                value.to_sql_text(datatype, &mut temp_buffer, &FormatOptions::default())?;
553
554            let delimiter_byte = delimiter.as_bytes()[0];
555            let quote_byte = quote.as_bytes()[0];
556            let null_string_bytes = null_string.as_bytes();
557
558            let should_quote = force_quote.contains(&col_index)
559                || match is_null {
560                    IsNull::Yes => false, // NULL values are never quoted in CSV (handled by null_string)
561                    IsNull::No => {
562                        let data = temp_buffer.as_ref();
563                        data.contains(&delimiter_byte)
564                            || data.contains(&quote_byte)
565                            || data.contains(&b'\n')
566                            || data.contains(&b'\r')
567                            || (!null_string_bytes.is_empty()
568                                && data
569                                    .windows(null_string_bytes.len())
570                                    .any(|w| w == null_string_bytes))
571                    }
572                };
573
574            if let IsNull::Yes = is_null {
575                self.buffer.put_slice(null_string_bytes);
576            } else if should_quote {
577                self.buffer.put_u8(quote_byte);
578
579                for &byte in temp_buffer.as_ref() {
580                    if byte == quote_byte {
581                        // Double the quote character
582                        self.buffer.put_u8(byte);
583                    }
584                    self.buffer.put_u8(byte);
585                }
586
587                self.buffer.put_u8(quote_byte);
588            } else {
589                self.buffer.put_slice(temp_buffer.as_ref());
590            }
591
592            // Add delimiter between fields
593            if !is_last {
594                self.buffer.put_slice(delimiter.as_bytes());
595            }
596
597            Ok(())
598        } else {
599            Err(PgWireError::IoError(std::io::Error::new(
600                std::io::ErrorKind::InvalidInput,
601                "CSV format expected",
602            )))
603        }
604    }
605
606    /// Take the current row as a CopyData message.
607    ///
608    /// For binary format: first call includes PGCOPY header.
609    /// For text/CSV format: each call returns one row with a trailing newline.
610    pub fn take_copy(&mut self) -> CopyData {
611        match &self.format {
612            CopyFormat::Binary => {
613                if !self.header_written {
614                    // Prepend header to field data
615                    let field_data = self.buffer.split();
616                    self.write_pgcop_header();
617                    self.buffer.put_i16(self.schema.len() as i16);
618                    self.buffer.extend_from_slice(&field_data);
619                    self.header_written = true;
620                } else {
621                    // Prepend field count before field data
622                    let field_data = self.buffer.split();
623                    self.buffer.put_i16(self.schema.len() as i16);
624                    self.buffer.extend_from_slice(&field_data);
625                }
626            }
627            CopyFormat::Text { .. } | CopyFormat::Csv { .. } => {
628                // Add newline at end of row
629                self.buffer.put_u8(b'\n');
630            }
631        }
632
633        self.col_index = 0;
634        CopyData::new(self.buffer.split().freeze())
635    }
636
637    /// Finish the COPY operation of binary format.
638    ///
639    /// For binary format: returns trailer (-1).
640    /// Note that this trailer is automatically appended to stream if you use
641    /// `CopyResponse` API.
642    pub fn finish_copy_binary() -> CopyData {
643        CopyData::new(Bytes::from_static(&[0xFF, 0xFF]))
644    }
645
646    /// Write PGCOPY binary header.
647    fn write_pgcop_header(&mut self) {
648        self.buffer.put_slice(b"PGCOPY\n\xFF\r\n\x00");
649        self.buffer.put_i32(0); // Flags (no OIDs)
650        self.buffer.put_i32(0); // Header extension length
651    }
652}
653
654/// Get response data for a `Describe` command
655pub trait DescribeResponse {
656    fn parameters(&self) -> Option<&[Type]>;
657
658    fn fields(&self) -> &[FieldInfo];
659
660    /// Create an no_data instance of `DescribeResponse`. This is typically used
661    /// when client tries to describe an empty query.
662    fn no_data() -> Self;
663
664    /// Return true if the `DescribeResponse` is empty/nodata
665    fn is_no_data(&self) -> bool;
666}
667
668/// Response for frontend describe statement requests.
669#[non_exhaustive]
670#[derive(Debug, new)]
671pub struct DescribeStatementResponse {
672    pub parameters: Vec<Type>,
673    pub fields: Vec<FieldInfo>,
674}
675
676impl DescribeResponse for DescribeStatementResponse {
677    fn parameters(&self) -> Option<&[Type]> {
678        Some(self.parameters.as_ref())
679    }
680
681    fn fields(&self) -> &[FieldInfo] {
682        &self.fields
683    }
684
685    /// Create an no_data instance of `DescribeStatementResponse`. This is typically used
686    /// when client tries to describe an empty query.
687    fn no_data() -> Self {
688        DescribeStatementResponse {
689            parameters: vec![],
690            fields: vec![],
691        }
692    }
693
694    /// Return true if the `DescribeStatementResponse` is empty/nodata
695    fn is_no_data(&self) -> bool {
696        self.parameters.is_empty() && self.fields.is_empty()
697    }
698}
699
700/// Response for frontend describe portal requests.
701#[non_exhaustive]
702#[derive(Debug, new)]
703pub struct DescribePortalResponse {
704    pub fields: Vec<FieldInfo>,
705}
706
707impl DescribeResponse for DescribePortalResponse {
708    fn parameters(&self) -> Option<&[Type]> {
709        None
710    }
711
712    fn fields(&self) -> &[FieldInfo] {
713        &self.fields
714    }
715
716    /// Create an no_data instance of `DescribePortalResponse`. This is typically used
717    /// when client tries to describe an empty query.
718    fn no_data() -> Self {
719        DescribePortalResponse { fields: vec![] }
720    }
721
722    /// Return true if the `DescribePortalResponse` is empty/nodata
723    fn is_no_data(&self) -> bool {
724        self.fields.is_empty()
725    }
726}
727
728/// Response for copy operations
729#[non_exhaustive]
730pub struct CopyResponse {
731    pub format: i8,
732    pub columns: usize,
733    pub data_stream: SendableCopyDataStream,
734}
735
736impl std::fmt::Debug for CopyResponse {
737    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
738        f.debug_struct("CopyResponse")
739            .field("format", &self.format)
740            .field("columns", &self.columns)
741            .finish()
742    }
743}
744
745impl CopyResponse {
746    pub fn new<S>(format: i8, columns: usize, data_stream: S) -> CopyResponse
747    where
748        S: Stream<Item = PgWireResult<CopyData>> + Send + 'static,
749    {
750        if format == 1 {
751            let data_stream = data_stream.chain(stream::once(future::ready(Ok(
752                CopyEncoder::finish_copy_binary(),
753            ))));
754            CopyResponse {
755                format,
756                columns,
757                data_stream: Box::pin(data_stream),
758            }
759        } else {
760            CopyResponse {
761                format,
762                columns,
763                data_stream: Box::pin(data_stream),
764            }
765        }
766    }
767
768    pub fn data_stream(&mut self) -> &mut SendableCopyDataStream {
769        &mut self.data_stream
770    }
771
772    pub fn column_formats(&self) -> Vec<i16> {
773        (0..self.columns).map(|_| self.format as i16).collect()
774    }
775}
776
777/// Query response types:
778///
779/// * Query: the response contains data rows
780/// * Execution: response for ddl/dml execution
781/// * Error: error response
782/// * EmptyQuery: when client sends an empty query
783/// * TransactionStart: indicate previous statement just started a transaction
784/// * TransactionEnd: indicate previous statement just ended a transaction
785/// * CopyIn: response for a copy-in request
786/// * CopyOut: response for a copy-out request
787/// * CopuBoth: response for a copy-both request
788#[derive(Debug)]
789pub enum Response {
790    EmptyQuery,
791    Query(QueryResponse),
792    Execution(Tag),
793    TransactionStart(Tag),
794    TransactionEnd(Tag),
795    Error(Box<ErrorInfo>),
796    CopyIn(CopyResponse),
797    CopyOut(CopyResponse),
798    CopyBoth(CopyResponse),
799}
800
801#[cfg(test)]
802mod test {
803
804    use super::*;
805
806    #[test]
807    fn test_command_complete() {
808        let tag = Tag::new("INSERT").with_rows(100);
809        let cc = CommandComplete::from(tag);
810
811        assert_eq!(cc.tag, "INSERT 100");
812
813        let tag = Tag::new("INSERT").with_oid(0).with_rows(100);
814        let cc = CommandComplete::from(tag);
815
816        assert_eq!(cc.tag, "INSERT 0 100");
817    }
818
819    #[test]
820    #[cfg(feature = "pg-type-chrono")]
821    fn test_data_row_encoder() {
822        use std::time::SystemTime;
823
824        let schema = Arc::new(vec![
825            FieldInfo::new("id".into(), None, None, Type::INT4, FieldFormat::Text),
826            FieldInfo::new("name".into(), None, None, Type::VARCHAR, FieldFormat::Text),
827            FieldInfo::new("ts".into(), None, None, Type::TIMESTAMP, FieldFormat::Text),
828        ]);
829        let now = SystemTime::now();
830        let mut encoder = DataRowEncoder::new(schema);
831        encoder.encode_field(&2001).unwrap();
832        encoder.encode_field(&"udev").unwrap();
833        encoder.encode_field(&now).unwrap();
834
835        let row = encoder.take_row();
836
837        assert_eq!(row.field_count, 3);
838
839        let mut expected = BytesMut::new();
840        expected.put_i32(4);
841        expected.put_slice("2001".as_bytes());
842        expected.put_i32(4);
843        expected.put_slice("udev".as_bytes());
844        expected.put_i32(26);
845        let _ = now.to_sql_text(&Type::TIMESTAMP, &mut expected, &FormatOptions::default());
846        assert_eq!(row.data, expected);
847    }
848
849    #[test]
850    fn test_copy_text_options_default() {
851        let opts = CopyTextOptions::default();
852        assert_eq!(opts.delimiter, "\t");
853        assert_eq!(opts.null_string, "\\N");
854    }
855
856    #[test]
857    fn test_copy_csv_options_default() {
858        let opts = CopyCsvOptions::default();
859        assert_eq!(opts.delimiter, ",");
860        assert_eq!(opts.quote, "\"");
861        assert_eq!(opts.escape, "\"");
862        assert_eq!(opts.null_string, "");
863        assert!(opts.force_quote.is_empty());
864    }
865
866    #[test]
867    fn test_copy_binary_header() {
868        let schema = Arc::new(vec![FieldInfo::new(
869            "id".into(),
870            None,
871            None,
872            Type::INT4,
873            FieldFormat::Binary,
874        )]);
875        let mut encoder = CopyEncoder::new_binary(schema.clone());
876
877        // First take_copy should include header
878        encoder.encode_field(&42).unwrap();
879        let copy_data = encoder.take_copy();
880
881        let data = copy_data.data.as_ref();
882        assert_eq!(&data[0..11], b"PGCOPY\n\xFF\r\n\0");
883
884        // Check flags (4 bytes, no OIDs = 0)
885        assert_eq!(&data[11..15], &[0x00, 0x00, 0x00, 0x00]);
886
887        // Check extension length (4 bytes, no extensions = 0)
888        assert_eq!(&data[15..19], &[0x00, 0x00, 0x00, 0x00]);
889
890        // Check field count (2 bytes)
891        assert_eq!(&data[19..21], &[0x00, 0x01]); // 1 field
892
893        // Check field length (4 bytes)
894        assert_eq!(&data[21..25], &[0x00, 0x00, 0x00, 0x04]); // 4 bytes
895
896        // Check field value (42 in network byte order)
897        assert_eq!(&data[25..29], &[0x00, 0x00, 0x00, 0x2A]);
898    }
899
900    #[test]
901    fn test_copy_binary_trailer() {
902        let copy_data = CopyEncoder::finish_copy_binary();
903        let data = copy_data.data.as_ref();
904
905        // Trailer is -1 as i16 (0xFFFF in network byte order)
906        assert_eq!(data, &[0xFF, 0xFF]);
907    }
908
909    #[test]
910    fn test_copy_text_default_delimiter() {
911        let schema = Arc::new(vec![
912            FieldInfo::new("id".into(), None, None, Type::INT4, FieldFormat::Text),
913            FieldInfo::new("name".into(), None, None, Type::VARCHAR, FieldFormat::Text),
914        ]);
915        let mut encoder = CopyEncoder::new_text(schema, CopyTextOptions::default());
916
917        encoder.encode_field(&1).unwrap();
918        encoder.encode_field(&"Alice").unwrap();
919        let copy_data = encoder.take_copy();
920
921        // Expected: "1\tAlice\n"
922        assert_eq!(copy_data.data.as_ref(), b"1\tAlice\n");
923    }
924
925    #[test]
926    fn test_copy_text_custom_delimiter() {
927        let schema = Arc::new(vec![
928            FieldInfo::new("id".into(), None, None, Type::INT4, FieldFormat::Text),
929            FieldInfo::new("name".into(), None, None, Type::VARCHAR, FieldFormat::Text),
930        ]);
931        let mut encoder = CopyEncoder::new_text(
932            schema,
933            CopyTextOptions {
934                delimiter: "|".into(),
935                null_string: "\\N".into(),
936            },
937        );
938
939        encoder.encode_field(&1).unwrap();
940        encoder.encode_field(&"Alice").unwrap();
941        let copy_data = encoder.take_copy();
942
943        // Expected: "1|Alice\n"
944        assert_eq!(copy_data.data.as_ref(), b"1|Alice\n");
945    }
946
947    #[test]
948    fn test_copy_text_null_handling() {
949        let schema = Arc::new(vec![
950            FieldInfo::new("id".into(), None, None, Type::INT4, FieldFormat::Text),
951            FieldInfo::new("name".into(), None, None, Type::VARCHAR, FieldFormat::Text),
952        ]);
953        let mut encoder = CopyEncoder::new_text(schema, CopyTextOptions::default());
954
955        encoder.encode_field(&1).unwrap();
956        encoder.encode_field(&None::<String>).unwrap();
957        let copy_data = encoder.take_copy();
958
959        // Expected: "1\t\\N\n"
960        assert_eq!(copy_data.data.as_ref(), b"1\t\\N\n");
961    }
962
963    #[test]
964    fn test_copy_text_backslash_escaping() {
965        let schema = Arc::new(vec![FieldInfo::new(
966            "value".into(),
967            None,
968            None,
969            Type::VARCHAR,
970            FieldFormat::Text,
971        )]);
972        let mut encoder = CopyEncoder::new_text(schema, CopyTextOptions::default());
973
974        encoder.encode_field(&"a\nb\tc\rd\\e").unwrap();
975        let copy_data = encoder.take_copy();
976
977        // Expected: "a\\nb\\tc\\rd\\\\e\n"
978        assert_eq!(copy_data.data.as_ref(), b"a\\nb\\tc\\rd\\\\e\n");
979    }
980
981    #[test]
982    fn test_copy_csv_default() {
983        let schema = Arc::new(vec![
984            FieldInfo::new("id".into(), None, None, Type::INT4, FieldFormat::Text),
985            FieldInfo::new("name".into(), None, None, Type::VARCHAR, FieldFormat::Text),
986        ]);
987        let mut encoder = CopyEncoder::new_csv(schema, CopyCsvOptions::default());
988
989        encoder.encode_field(&1).unwrap();
990        encoder.encode_field(&"Alice").unwrap();
991        let copy_data = encoder.take_copy();
992
993        // Expected: "1,Alice\n"
994        assert_eq!(copy_data.data.as_ref(), b"1,Alice\n");
995    }
996
997    #[test]
998    fn test_copy_csv_quoting() {
999        let schema = Arc::new(vec![FieldInfo::new(
1000            "value".into(),
1001            None,
1002            None,
1003            Type::VARCHAR,
1004            FieldFormat::Text,
1005        )]);
1006        let mut encoder = CopyEncoder::new_csv(schema, CopyCsvOptions::default());
1007
1008        encoder.encode_field(&"a,b\"c\nd").unwrap();
1009        let copy_data = encoder.take_copy();
1010
1011        // Should be quoted because it contains comma and newline
1012        assert_eq!(copy_data.data.as_ref(), b"\"a,b\"\"c\nd\"\n");
1013    }
1014
1015    #[test]
1016    fn test_copy_csv_force_quote() {
1017        let schema = Arc::new(vec![
1018            FieldInfo::new("id".into(), None, None, Type::INT4, FieldFormat::Text),
1019            FieldInfo::new("name".into(), None, None, Type::VARCHAR, FieldFormat::Text),
1020        ]);
1021        let mut encoder = CopyEncoder::new_csv(
1022            schema,
1023            CopyCsvOptions {
1024                force_quote: vec![1],
1025                ..Default::default()
1026            },
1027        );
1028
1029        encoder.encode_field(&1).unwrap();
1030        encoder.encode_field(&"Alice").unwrap();
1031        let copy_data = encoder.take_copy();
1032
1033        // Expected: "1,\"Alice\"\n" - second column force quoted
1034        assert_eq!(copy_data.data.as_ref(), b"1,\"Alice\"\n");
1035    }
1036
1037    #[test]
1038    fn test_copy_binary_multiple_rows() {
1039        let schema = Arc::new(vec![
1040            FieldInfo::new("id".into(), None, None, Type::INT4, FieldFormat::Binary),
1041            FieldInfo::new(
1042                "name".into(),
1043                None,
1044                None,
1045                Type::VARCHAR,
1046                FieldFormat::Binary,
1047            ),
1048        ]);
1049        let mut encoder = CopyEncoder::new_binary(schema);
1050
1051        // First row
1052        encoder.encode_field(&1i32).unwrap();
1053        encoder.encode_field(&"Alice".to_string()).unwrap();
1054        let copy_data1 = encoder.take_copy();
1055
1056        // Second row
1057        encoder.encode_field(&2i32).unwrap();
1058        encoder.encode_field(&"Bob".to_string()).unwrap();
1059        let copy_data2 = encoder.take_copy();
1060
1061        // Verify first row format
1062        let data1 = copy_data1.data.as_ref();
1063
1064        // Header is 19 bytes, then field count (2 bytes)
1065        assert_eq!(&data1[19..21], &[0x00, 0x02]); // 2 fields
1066
1067        // Verify second row format
1068        let data2 = copy_data2.data.as_ref();
1069
1070        // Field count should be at the beginning (no header on second row)
1071        assert_eq!(&data2[0..2], &[0x00, 0x02]); // 2 fields
1072    }
1073}