1use std::fmt::Debug;
2use std::pin::Pin;
3use std::sync::{Arc, LazyLock};
4
5use bytes::{BufMut, Bytes, BytesMut};
6use futures::{Stream, StreamExt, future, stream};
7use postgres_types::{IsNull, Oid, ToSql, Type};
8
9use crate::error::{ErrorInfo, PgWireError, PgWireResult};
10use crate::messages::copy::CopyData;
11use crate::messages::data::{
12 DataRow, FORMAT_CODE_BINARY, FORMAT_CODE_TEXT, FieldDescription, RowDescription,
13};
14use crate::messages::response::CommandComplete;
15use crate::types::ToSqlText;
16use crate::types::format::FormatOptions;
17use smol_str::SmolStr;
18
19#[derive(Debug, Eq, PartialEq, Clone)]
20pub struct Tag {
21 command: String,
22 oid: Option<Oid>,
23 rows: Option<usize>,
24}
25
26impl Tag {
27 pub fn new(command: &str) -> Tag {
28 Tag {
29 command: command.to_owned(),
30 oid: None,
31 rows: None,
32 }
33 }
34
35 pub fn with_rows(mut self, rows: usize) -> Tag {
36 self.rows = Some(rows);
37 self
38 }
39
40 pub fn with_oid(mut self, oid: Oid) -> Tag {
41 self.oid = Some(oid);
42 self
43 }
44}
45
46impl From<Tag> for CommandComplete {
47 fn from(tag: Tag) -> CommandComplete {
48 let tag_string = if let (Some(oid), Some(rows)) = (tag.oid, tag.rows) {
49 format!("{} {oid} {rows}", tag.command)
50 } else if let Some(rows) = tag.rows {
51 format!("{} {rows}", tag.command)
52 } else {
53 tag.command
54 };
55 CommandComplete::new(tag_string)
56 }
57}
58
59#[derive(Debug, Eq, PartialEq, Clone, Copy)]
61pub enum FieldFormat {
62 Text,
63 Binary,
64}
65
66impl FieldFormat {
67 pub fn value(&self) -> i16 {
69 match self {
70 Self::Text => FORMAT_CODE_TEXT,
71 Self::Binary => FORMAT_CODE_BINARY,
72 }
73 }
74
75 pub fn from(code: i16) -> Self {
80 if code == FORMAT_CODE_BINARY {
81 FieldFormat::Binary
82 } else {
83 FieldFormat::Text
84 }
85 }
86}
87
88#[derive(Debug, Clone, Eq, PartialEq)]
90pub struct CopyTextOptions {
91 pub delimiter: SmolStr,
92 pub null_string: SmolStr,
93}
94
95impl Default for CopyTextOptions {
96 fn default() -> Self {
97 Self {
98 delimiter: "\t".into(),
99 null_string: "\\N".into(),
100 }
101 }
102}
103
104#[derive(Debug, Clone, Eq, PartialEq)]
106pub struct CopyCsvOptions {
107 pub delimiter: SmolStr,
108 pub quote: SmolStr,
109 pub escape: SmolStr,
110 pub null_string: SmolStr,
111 pub force_quote: Vec<usize>,
112}
113
114impl Default for CopyCsvOptions {
115 fn default() -> Self {
116 Self {
117 delimiter: ",".into(),
118 quote: "\"".into(),
119 escape: "\"".into(),
120 null_string: "".into(),
121 force_quote: vec![],
122 }
123 }
124}
125
126thread_local! {
139 static DEFAULT_FORMAT_OPTIONS: LazyLock<Arc<FormatOptions>> = LazyLock::new(Default::default);
140}
141
142#[derive(Debug, new, Eq, PartialEq, Clone)]
143pub struct FieldInfo {
144 name: String,
145 table_id: Option<i32>,
146 column_id: Option<i16>,
147 datatype: Type,
148 format: FieldFormat,
149 #[new(value = "DEFAULT_FORMAT_OPTIONS.with(|opts| Arc::clone(&*opts))")]
150 format_options: Arc<FormatOptions>,
151}
152
153impl FieldInfo {
154 pub fn name(&self) -> &str {
155 &self.name
156 }
157
158 pub fn table_id(&self) -> Option<i32> {
159 self.table_id
160 }
161
162 pub fn column_id(&self) -> Option<i16> {
163 self.column_id
164 }
165
166 pub fn datatype(&self) -> &Type {
167 &self.datatype
168 }
169
170 pub fn format(&self) -> FieldFormat {
171 self.format
172 }
173
174 pub fn format_options(&self) -> &Arc<FormatOptions> {
175 &self.format_options
176 }
177
178 pub fn with_format_options(mut self, format_options: Arc<FormatOptions>) -> Self {
179 self.format_options = format_options;
180 self
181 }
182}
183
184impl From<&FieldInfo> for FieldDescription {
185 fn from(fi: &FieldInfo) -> Self {
186 FieldDescription::new(
187 fi.name.clone(), fi.table_id.unwrap_or(0), fi.column_id.unwrap_or(0), fi.datatype.oid(), 0,
193 0,
194 fi.format.value(),
195 )
196 }
197}
198
199impl From<FieldDescription> for FieldInfo {
200 fn from(value: FieldDescription) -> Self {
201 FieldInfo::new(
202 value.name,
203 Some(value.table_id),
204 Some(value.column_id),
205 Type::from_oid(value.type_id).unwrap_or(Type::UNKNOWN),
206 FieldFormat::from(value.format_code),
207 )
208 }
209}
210
211pub(crate) fn into_row_description(fields: &[FieldInfo]) -> RowDescription {
212 RowDescription::new(fields.iter().map(Into::into).collect())
213}
214
215pub type SendableRowStream = Pin<Box<dyn Stream<Item = PgWireResult<DataRow>> + Send>>;
216
217pub type SendableCopyDataStream = Pin<Box<dyn Stream<Item = PgWireResult<CopyData>> + Send>>;
218
219#[non_exhaustive]
220pub struct QueryResponse {
221 pub command_tag: String,
222 pub row_schema: Arc<Vec<FieldInfo>>,
223 pub data_rows: SendableRowStream,
224}
225
226impl Debug for QueryResponse {
227 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
228 f.debug_struct("QueryResponse")
229 .field("command_tag", &self.command_tag)
230 .field("row_schema", &self.row_schema)
231 .finish()
232 }
233}
234
235impl QueryResponse {
236 pub fn new<S>(field_defs: Arc<Vec<FieldInfo>>, row_stream: S) -> QueryResponse
239 where
240 S: Stream<Item = PgWireResult<DataRow>> + Send + 'static,
241 {
242 QueryResponse {
243 command_tag: "SELECT".to_owned(),
244 row_schema: field_defs,
245 data_rows: Box::pin(row_stream),
246 }
247 }
248
249 pub fn command_tag(&self) -> &str {
251 &self.command_tag
252 }
253
254 pub fn set_command_tag(&mut self, command_tag: &str) {
256 command_tag.clone_into(&mut self.command_tag);
257 }
258
259 pub fn row_schema(&self) -> Arc<Vec<FieldInfo>> {
261 self.row_schema.clone()
262 }
263
264 pub fn data_rows(&mut self) -> &mut SendableRowStream {
266 &mut self.data_rows
267 }
268}
269
270pub struct DataRowEncoder {
271 schema: Arc<Vec<FieldInfo>>,
272 row_buffer: BytesMut,
273 col_index: usize,
274}
275
276impl DataRowEncoder {
277 pub fn new(fields: Arc<Vec<FieldInfo>>) -> DataRowEncoder {
279 Self {
280 schema: fields,
281 row_buffer: BytesMut::with_capacity(128),
282 col_index: 0,
283 }
284 }
285
286 pub fn encode_field_with_type_and_format<T>(
291 &mut self,
292 value: &T,
293 data_type: &Type,
294 format: FieldFormat,
295 format_options: &FormatOptions,
296 ) -> PgWireResult<()>
297 where
298 T: ToSql + ToSqlText + Sized,
299 {
300 let prev_index = self.row_buffer.len();
302 self.row_buffer.put_i32(-1);
304
305 let is_null = if format == FieldFormat::Text {
306 value.to_sql_text(data_type, &mut self.row_buffer, format_options)?
307 } else {
308 value.to_sql(data_type, &mut self.row_buffer)?
309 };
310
311 if let IsNull::No = is_null {
312 let value_length = self.row_buffer.len() - prev_index - 4;
313 let mut length_bytes = &mut self.row_buffer[prev_index..(prev_index + 4)];
314 length_bytes.put_i32(value_length as i32);
315 }
316
317 self.col_index += 1;
318
319 Ok(())
320 }
321
322 pub fn encode_field<T>(&mut self, value: &T) -> PgWireResult<()>
326 where
327 T: ToSql + ToSqlText + Sized,
328 {
329 let field = &self.schema[self.col_index];
330
331 let data_type = field.datatype().clone();
332 let format = field.format();
333 let format_options = field.format_options().clone();
334
335 self.encode_field_with_type_and_format(value, &data_type, format, format_options.as_ref())
336 }
337
338 #[deprecated(
339 since = "0.37.0",
340 note = "DataRowEncoder is reusable since 0.37, use `take_row() instead`"
341 )]
342 pub fn finish(self) -> PgWireResult<DataRow> {
343 Ok(DataRow::new(self.row_buffer, self.col_index as i16))
344 }
345
346 pub fn take_row(&mut self) -> DataRow {
351 let row = DataRow::new(self.row_buffer.split(), self.col_index as i16);
352 self.col_index = 0;
353 row
354 }
355}
356
357#[derive(Debug, Clone, Eq, PartialEq)]
359enum CopyFormat {
360 Binary,
361 Text {
362 delimiter: SmolStr,
363 null_string: SmolStr,
364 },
365 Csv {
366 delimiter: SmolStr,
367 quote: SmolStr,
368 escape: SmolStr,
369 null_string: SmolStr,
370 force_quote: Vec<usize>,
371 },
372}
373
374pub struct CopyEncoder {
378 schema: Arc<Vec<FieldInfo>>,
379 buffer: BytesMut,
380 format: CopyFormat,
381 col_index: usize,
382 header_written: bool,
383}
384
385impl CopyEncoder {
386 pub fn new_binary(schema: Arc<Vec<FieldInfo>>) -> Self {
388 Self {
389 schema,
390 buffer: BytesMut::with_capacity(128),
391 format: CopyFormat::Binary,
392 col_index: 0,
393 header_written: false,
394 }
395 }
396
397 pub fn new_text(schema: Arc<Vec<FieldInfo>>, options: CopyTextOptions) -> Self {
399 Self {
400 schema,
401 buffer: BytesMut::with_capacity(128),
402 format: CopyFormat::Text {
403 delimiter: options.delimiter,
404 null_string: options.null_string,
405 },
406 col_index: 0,
407 header_written: false,
408 }
409 }
410
411 pub fn new_csv(schema: Arc<Vec<FieldInfo>>, options: CopyCsvOptions) -> Self {
413 Self {
414 schema,
415 buffer: BytesMut::with_capacity(128),
416 format: CopyFormat::Csv {
417 delimiter: options.delimiter,
418 quote: options.quote,
419 escape: options.escape,
420 null_string: options.null_string,
421 force_quote: options.force_quote,
422 },
423 col_index: 0,
424 header_written: false,
425 }
426 }
427
428 pub fn encode_field<T>(&mut self, value: &T) -> PgWireResult<()>
432 where
433 T: ToSql + ToSqlText + Sized,
434 {
435 let datatype = self.schema[self.col_index].datatype().clone();
436 let col_index = self.col_index;
437 let num_fields = self.schema.len();
438
439 match &self.format {
440 CopyFormat::Binary => self.encode_field_binary(value, &datatype)?,
441 CopyFormat::Text { .. } => {
442 let is_last = col_index == num_fields - 1;
443 self.encode_field_text(value, &datatype, is_last)?;
444 }
445 CopyFormat::Csv { .. } => {
446 let is_last = col_index == num_fields - 1;
447 self.encode_field_csv(value, &datatype, is_last)?;
448 }
449 }
450
451 self.col_index += 1;
452 Ok(())
453 }
454
455 fn encode_field_binary<T>(&mut self, value: &T, datatype: &Type) -> PgWireResult<()>
457 where
458 T: ToSql + ToSqlText,
459 {
460 let prev_index = self.buffer.len();
461 self.buffer.put_i32(-1);
462
463 let is_null = value.to_sql(datatype, &mut self.buffer)?;
464
465 if let IsNull::No = is_null {
466 let value_length = self.buffer.len() - prev_index - 4;
467 let mut length_bytes = &mut self.buffer[prev_index..(prev_index + 4)];
468 length_bytes.put_i32(value_length as i32);
469 }
470
471 Ok(())
472 }
473
474 fn encode_field_text<T>(
476 &mut self,
477 value: &T,
478 datatype: &Type,
479 is_last: bool,
480 ) -> PgWireResult<()>
481 where
482 T: ToSqlText,
483 {
484 if let CopyFormat::Text {
485 delimiter,
486 null_string,
487 } = &self.format
488 {
489 let mut temp_buffer = BytesMut::new();
490 let is_null =
491 value.to_sql_text(datatype, &mut temp_buffer, &FormatOptions::default())?;
492
493 if let IsNull::Yes = is_null {
494 self.buffer.put_slice(null_string.as_bytes());
495 } else {
496 for &byte in temp_buffer.as_ref() {
498 match byte {
499 b'\n' => {
500 self.buffer.put_slice(b"\\n");
501 }
502 b'\r' => {
503 self.buffer.put_slice(b"\\r");
504 }
505 b'\t' => {
506 self.buffer.put_slice(b"\\t");
507 }
508 b'\\' => {
509 self.buffer.put_slice(b"\\\\");
510 }
511 b if byte == delimiter.as_bytes()[0] => {
512 self.buffer.put_u8(b'\\');
513 self.buffer.put_u8(byte);
514 }
515 _ => {
516 self.buffer.put_u8(byte);
517 }
518 }
519 }
520 }
521
522 if !is_last {
524 self.buffer.put_slice(delimiter.as_bytes());
525 }
526
527 Ok(())
528 } else {
529 Err(PgWireError::IoError(std::io::Error::new(
530 std::io::ErrorKind::InvalidInput,
531 "Text format expected",
532 )))
533 }
534 }
535
536 fn encode_field_csv<T>(&mut self, value: &T, datatype: &Type, is_last: bool) -> PgWireResult<()>
538 where
539 T: ToSqlText,
540 {
541 if let CopyFormat::Csv {
542 delimiter,
543 quote,
544 null_string,
545 force_quote,
546 escape: _,
547 } = &self.format
548 {
549 let col_index = self.col_index;
550 let mut temp_buffer = BytesMut::new();
551 let is_null =
552 value.to_sql_text(datatype, &mut temp_buffer, &FormatOptions::default())?;
553
554 let delimiter_byte = delimiter.as_bytes()[0];
555 let quote_byte = quote.as_bytes()[0];
556 let null_string_bytes = null_string.as_bytes();
557
558 let should_quote = force_quote.contains(&col_index)
559 || match is_null {
560 IsNull::Yes => false, IsNull::No => {
562 let data = temp_buffer.as_ref();
563 data.contains(&delimiter_byte)
564 || data.contains("e_byte)
565 || data.contains(&b'\n')
566 || data.contains(&b'\r')
567 || (!null_string_bytes.is_empty()
568 && data
569 .windows(null_string_bytes.len())
570 .any(|w| w == null_string_bytes))
571 }
572 };
573
574 if let IsNull::Yes = is_null {
575 self.buffer.put_slice(null_string_bytes);
576 } else if should_quote {
577 self.buffer.put_u8(quote_byte);
578
579 for &byte in temp_buffer.as_ref() {
580 if byte == quote_byte {
581 self.buffer.put_u8(byte);
583 }
584 self.buffer.put_u8(byte);
585 }
586
587 self.buffer.put_u8(quote_byte);
588 } else {
589 self.buffer.put_slice(temp_buffer.as_ref());
590 }
591
592 if !is_last {
594 self.buffer.put_slice(delimiter.as_bytes());
595 }
596
597 Ok(())
598 } else {
599 Err(PgWireError::IoError(std::io::Error::new(
600 std::io::ErrorKind::InvalidInput,
601 "CSV format expected",
602 )))
603 }
604 }
605
606 pub fn take_copy(&mut self) -> CopyData {
611 match &self.format {
612 CopyFormat::Binary => {
613 if !self.header_written {
614 let field_data = self.buffer.split();
616 self.write_pgcop_header();
617 self.buffer.put_i16(self.schema.len() as i16);
618 self.buffer.extend_from_slice(&field_data);
619 self.header_written = true;
620 } else {
621 let field_data = self.buffer.split();
623 self.buffer.put_i16(self.schema.len() as i16);
624 self.buffer.extend_from_slice(&field_data);
625 }
626 }
627 CopyFormat::Text { .. } | CopyFormat::Csv { .. } => {
628 self.buffer.put_u8(b'\n');
630 }
631 }
632
633 self.col_index = 0;
634 CopyData::new(self.buffer.split().freeze())
635 }
636
637 pub fn finish_copy_binary() -> CopyData {
643 CopyData::new(Bytes::from_static(&[0xFF, 0xFF]))
644 }
645
646 fn write_pgcop_header(&mut self) {
648 self.buffer.put_slice(b"PGCOPY\n\xFF\r\n\x00");
649 self.buffer.put_i32(0); self.buffer.put_i32(0); }
652}
653
654pub trait DescribeResponse {
656 fn parameters(&self) -> Option<&[Type]>;
657
658 fn fields(&self) -> &[FieldInfo];
659
660 fn no_data() -> Self;
663
664 fn is_no_data(&self) -> bool;
666}
667
668#[non_exhaustive]
670#[derive(Debug, new)]
671pub struct DescribeStatementResponse {
672 pub parameters: Vec<Type>,
673 pub fields: Vec<FieldInfo>,
674}
675
676impl DescribeResponse for DescribeStatementResponse {
677 fn parameters(&self) -> Option<&[Type]> {
678 Some(self.parameters.as_ref())
679 }
680
681 fn fields(&self) -> &[FieldInfo] {
682 &self.fields
683 }
684
685 fn no_data() -> Self {
688 DescribeStatementResponse {
689 parameters: vec![],
690 fields: vec![],
691 }
692 }
693
694 fn is_no_data(&self) -> bool {
696 self.parameters.is_empty() && self.fields.is_empty()
697 }
698}
699
700#[non_exhaustive]
702#[derive(Debug, new)]
703pub struct DescribePortalResponse {
704 pub fields: Vec<FieldInfo>,
705}
706
707impl DescribeResponse for DescribePortalResponse {
708 fn parameters(&self) -> Option<&[Type]> {
709 None
710 }
711
712 fn fields(&self) -> &[FieldInfo] {
713 &self.fields
714 }
715
716 fn no_data() -> Self {
719 DescribePortalResponse { fields: vec![] }
720 }
721
722 fn is_no_data(&self) -> bool {
724 self.fields.is_empty()
725 }
726}
727
728#[non_exhaustive]
730pub struct CopyResponse {
731 pub format: i8,
732 pub columns: usize,
733 pub data_stream: SendableCopyDataStream,
734}
735
736impl std::fmt::Debug for CopyResponse {
737 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
738 f.debug_struct("CopyResponse")
739 .field("format", &self.format)
740 .field("columns", &self.columns)
741 .finish()
742 }
743}
744
745impl CopyResponse {
746 pub fn new<S>(format: i8, columns: usize, data_stream: S) -> CopyResponse
747 where
748 S: Stream<Item = PgWireResult<CopyData>> + Send + 'static,
749 {
750 if format == 1 {
751 let data_stream = data_stream.chain(stream::once(future::ready(Ok(
752 CopyEncoder::finish_copy_binary(),
753 ))));
754 CopyResponse {
755 format,
756 columns,
757 data_stream: Box::pin(data_stream),
758 }
759 } else {
760 CopyResponse {
761 format,
762 columns,
763 data_stream: Box::pin(data_stream),
764 }
765 }
766 }
767
768 pub fn data_stream(&mut self) -> &mut SendableCopyDataStream {
769 &mut self.data_stream
770 }
771
772 pub fn column_formats(&self) -> Vec<i16> {
773 (0..self.columns).map(|_| self.format as i16).collect()
774 }
775}
776
777#[derive(Debug)]
789pub enum Response {
790 EmptyQuery,
791 Query(QueryResponse),
792 Execution(Tag),
793 TransactionStart(Tag),
794 TransactionEnd(Tag),
795 Error(Box<ErrorInfo>),
796 CopyIn(CopyResponse),
797 CopyOut(CopyResponse),
798 CopyBoth(CopyResponse),
799}
800
801#[cfg(test)]
802mod test {
803
804 use super::*;
805
806 #[test]
807 fn test_command_complete() {
808 let tag = Tag::new("INSERT").with_rows(100);
809 let cc = CommandComplete::from(tag);
810
811 assert_eq!(cc.tag, "INSERT 100");
812
813 let tag = Tag::new("INSERT").with_oid(0).with_rows(100);
814 let cc = CommandComplete::from(tag);
815
816 assert_eq!(cc.tag, "INSERT 0 100");
817 }
818
819 #[test]
820 #[cfg(feature = "pg-type-chrono")]
821 fn test_data_row_encoder() {
822 use std::time::SystemTime;
823
824 let schema = Arc::new(vec![
825 FieldInfo::new("id".into(), None, None, Type::INT4, FieldFormat::Text),
826 FieldInfo::new("name".into(), None, None, Type::VARCHAR, FieldFormat::Text),
827 FieldInfo::new("ts".into(), None, None, Type::TIMESTAMP, FieldFormat::Text),
828 ]);
829 let now = SystemTime::now();
830 let mut encoder = DataRowEncoder::new(schema);
831 encoder.encode_field(&2001).unwrap();
832 encoder.encode_field(&"udev").unwrap();
833 encoder.encode_field(&now).unwrap();
834
835 let row = encoder.take_row();
836
837 assert_eq!(row.field_count, 3);
838
839 let mut expected = BytesMut::new();
840 expected.put_i32(4);
841 expected.put_slice("2001".as_bytes());
842 expected.put_i32(4);
843 expected.put_slice("udev".as_bytes());
844 expected.put_i32(26);
845 let _ = now.to_sql_text(&Type::TIMESTAMP, &mut expected, &FormatOptions::default());
846 assert_eq!(row.data, expected);
847 }
848
849 #[test]
850 fn test_copy_text_options_default() {
851 let opts = CopyTextOptions::default();
852 assert_eq!(opts.delimiter, "\t");
853 assert_eq!(opts.null_string, "\\N");
854 }
855
856 #[test]
857 fn test_copy_csv_options_default() {
858 let opts = CopyCsvOptions::default();
859 assert_eq!(opts.delimiter, ",");
860 assert_eq!(opts.quote, "\"");
861 assert_eq!(opts.escape, "\"");
862 assert_eq!(opts.null_string, "");
863 assert!(opts.force_quote.is_empty());
864 }
865
866 #[test]
867 fn test_copy_binary_header() {
868 let schema = Arc::new(vec![FieldInfo::new(
869 "id".into(),
870 None,
871 None,
872 Type::INT4,
873 FieldFormat::Binary,
874 )]);
875 let mut encoder = CopyEncoder::new_binary(schema.clone());
876
877 encoder.encode_field(&42).unwrap();
879 let copy_data = encoder.take_copy();
880
881 let data = copy_data.data.as_ref();
882 assert_eq!(&data[0..11], b"PGCOPY\n\xFF\r\n\0");
883
884 assert_eq!(&data[11..15], &[0x00, 0x00, 0x00, 0x00]);
886
887 assert_eq!(&data[15..19], &[0x00, 0x00, 0x00, 0x00]);
889
890 assert_eq!(&data[19..21], &[0x00, 0x01]); assert_eq!(&data[21..25], &[0x00, 0x00, 0x00, 0x04]); assert_eq!(&data[25..29], &[0x00, 0x00, 0x00, 0x2A]);
898 }
899
900 #[test]
901 fn test_copy_binary_trailer() {
902 let copy_data = CopyEncoder::finish_copy_binary();
903 let data = copy_data.data.as_ref();
904
905 assert_eq!(data, &[0xFF, 0xFF]);
907 }
908
909 #[test]
910 fn test_copy_text_default_delimiter() {
911 let schema = Arc::new(vec![
912 FieldInfo::new("id".into(), None, None, Type::INT4, FieldFormat::Text),
913 FieldInfo::new("name".into(), None, None, Type::VARCHAR, FieldFormat::Text),
914 ]);
915 let mut encoder = CopyEncoder::new_text(schema, CopyTextOptions::default());
916
917 encoder.encode_field(&1).unwrap();
918 encoder.encode_field(&"Alice").unwrap();
919 let copy_data = encoder.take_copy();
920
921 assert_eq!(copy_data.data.as_ref(), b"1\tAlice\n");
923 }
924
925 #[test]
926 fn test_copy_text_custom_delimiter() {
927 let schema = Arc::new(vec![
928 FieldInfo::new("id".into(), None, None, Type::INT4, FieldFormat::Text),
929 FieldInfo::new("name".into(), None, None, Type::VARCHAR, FieldFormat::Text),
930 ]);
931 let mut encoder = CopyEncoder::new_text(
932 schema,
933 CopyTextOptions {
934 delimiter: "|".into(),
935 null_string: "\\N".into(),
936 },
937 );
938
939 encoder.encode_field(&1).unwrap();
940 encoder.encode_field(&"Alice").unwrap();
941 let copy_data = encoder.take_copy();
942
943 assert_eq!(copy_data.data.as_ref(), b"1|Alice\n");
945 }
946
947 #[test]
948 fn test_copy_text_null_handling() {
949 let schema = Arc::new(vec![
950 FieldInfo::new("id".into(), None, None, Type::INT4, FieldFormat::Text),
951 FieldInfo::new("name".into(), None, None, Type::VARCHAR, FieldFormat::Text),
952 ]);
953 let mut encoder = CopyEncoder::new_text(schema, CopyTextOptions::default());
954
955 encoder.encode_field(&1).unwrap();
956 encoder.encode_field(&None::<String>).unwrap();
957 let copy_data = encoder.take_copy();
958
959 assert_eq!(copy_data.data.as_ref(), b"1\t\\N\n");
961 }
962
963 #[test]
964 fn test_copy_text_backslash_escaping() {
965 let schema = Arc::new(vec![FieldInfo::new(
966 "value".into(),
967 None,
968 None,
969 Type::VARCHAR,
970 FieldFormat::Text,
971 )]);
972 let mut encoder = CopyEncoder::new_text(schema, CopyTextOptions::default());
973
974 encoder.encode_field(&"a\nb\tc\rd\\e").unwrap();
975 let copy_data = encoder.take_copy();
976
977 assert_eq!(copy_data.data.as_ref(), b"a\\nb\\tc\\rd\\\\e\n");
979 }
980
981 #[test]
982 fn test_copy_csv_default() {
983 let schema = Arc::new(vec![
984 FieldInfo::new("id".into(), None, None, Type::INT4, FieldFormat::Text),
985 FieldInfo::new("name".into(), None, None, Type::VARCHAR, FieldFormat::Text),
986 ]);
987 let mut encoder = CopyEncoder::new_csv(schema, CopyCsvOptions::default());
988
989 encoder.encode_field(&1).unwrap();
990 encoder.encode_field(&"Alice").unwrap();
991 let copy_data = encoder.take_copy();
992
993 assert_eq!(copy_data.data.as_ref(), b"1,Alice\n");
995 }
996
997 #[test]
998 fn test_copy_csv_quoting() {
999 let schema = Arc::new(vec![FieldInfo::new(
1000 "value".into(),
1001 None,
1002 None,
1003 Type::VARCHAR,
1004 FieldFormat::Text,
1005 )]);
1006 let mut encoder = CopyEncoder::new_csv(schema, CopyCsvOptions::default());
1007
1008 encoder.encode_field(&"a,b\"c\nd").unwrap();
1009 let copy_data = encoder.take_copy();
1010
1011 assert_eq!(copy_data.data.as_ref(), b"\"a,b\"\"c\nd\"\n");
1013 }
1014
1015 #[test]
1016 fn test_copy_csv_force_quote() {
1017 let schema = Arc::new(vec![
1018 FieldInfo::new("id".into(), None, None, Type::INT4, FieldFormat::Text),
1019 FieldInfo::new("name".into(), None, None, Type::VARCHAR, FieldFormat::Text),
1020 ]);
1021 let mut encoder = CopyEncoder::new_csv(
1022 schema,
1023 CopyCsvOptions {
1024 force_quote: vec![1],
1025 ..Default::default()
1026 },
1027 );
1028
1029 encoder.encode_field(&1).unwrap();
1030 encoder.encode_field(&"Alice").unwrap();
1031 let copy_data = encoder.take_copy();
1032
1033 assert_eq!(copy_data.data.as_ref(), b"1,\"Alice\"\n");
1035 }
1036
1037 #[test]
1038 fn test_copy_binary_multiple_rows() {
1039 let schema = Arc::new(vec![
1040 FieldInfo::new("id".into(), None, None, Type::INT4, FieldFormat::Binary),
1041 FieldInfo::new(
1042 "name".into(),
1043 None,
1044 None,
1045 Type::VARCHAR,
1046 FieldFormat::Binary,
1047 ),
1048 ]);
1049 let mut encoder = CopyEncoder::new_binary(schema);
1050
1051 encoder.encode_field(&1i32).unwrap();
1053 encoder.encode_field(&"Alice".to_string()).unwrap();
1054 let copy_data1 = encoder.take_copy();
1055
1056 encoder.encode_field(&2i32).unwrap();
1058 encoder.encode_field(&"Bob".to_string()).unwrap();
1059 let copy_data2 = encoder.take_copy();
1060
1061 let data1 = copy_data1.data.as_ref();
1063
1064 assert_eq!(&data1[19..21], &[0x00, 0x02]); let data2 = copy_data2.data.as_ref();
1069
1070 assert_eq!(&data2[0..2], &[0x00, 0x02]); }
1073}