1use super::{
5 PgConnection, PgError, PgResult, is_ignorable_session_message, parse_affected_rows,
6 unexpected_backend_message,
7};
8use crate::protocol::{AstEncoder, BackendMessage, PgEncoder};
9use bytes::BytesMut;
10use qail_core::ast::{Action, Qail};
11use std::future::Future;
12
13pub(crate) fn quote_copy_column_ident(ident: &str) -> PgResult<String> {
15 if ident.is_empty() {
16 return Err(PgError::Query(
17 "COPY column identifier is empty".to_string(),
18 ));
19 }
20 if ident.contains('\0') {
21 return Err(PgError::Query(
22 "COPY column identifier contains NUL byte".to_string(),
23 ));
24 }
25 Ok(format!("\"{}\"", ident.replace('"', "\"\"")))
26}
27
28pub(crate) fn quote_copy_table_ref(table: &str) -> PgResult<String> {
30 if table.is_empty() {
31 return Err(PgError::Query("COPY table identifier is empty".to_string()));
32 }
33 if table.contains('\0') {
34 return Err(PgError::Query(
35 "COPY table identifier contains NUL byte".to_string(),
36 ));
37 }
38
39 table
40 .split('.')
41 .map(|part| {
42 let part = part.trim();
43 if part.is_empty() {
44 return Err(PgError::Query(
45 "COPY table identifier contains an empty path segment".to_string(),
46 ));
47 }
48 quote_copy_column_ident(part)
49 })
50 .collect::<PgResult<Vec<_>>>()
51 .map(|parts| parts.join("."))
52}
53
54fn parse_copy_text_row(line: &[u8]) -> PgResult<Vec<String>> {
55 let line = if line.ends_with(b"\r") {
56 &line[..line.len().saturating_sub(1)]
57 } else {
58 line
59 };
60
61 let mut fields = Vec::new();
62 let mut start = 0;
63 for (idx, byte) in line.iter().enumerate() {
64 if *byte == b'\t' {
65 fields.push(decode_copy_text_field(&line[start..idx])?);
66 start = idx + 1;
67 }
68 }
69 fields.push(decode_copy_text_field(&line[start..])?);
70 Ok(fields)
71}
72
73fn decode_copy_text_field(field: &[u8]) -> PgResult<String> {
74 if field == b"\\N" {
75 return Ok(String::new());
76 }
77
78 let mut out = Vec::with_capacity(field.len());
79 let mut idx = 0;
80 while idx < field.len() {
81 if field[idx] != b'\\' {
82 out.push(field[idx]);
83 idx += 1;
84 continue;
85 }
86
87 let Some(&escaped) = field.get(idx + 1) else {
88 out.push(b'\\');
89 break;
90 };
91
92 match escaped {
93 b'b' => {
94 out.push(0x08);
95 idx += 2;
96 }
97 b'f' => {
98 out.push(0x0c);
99 idx += 2;
100 }
101 b'n' => {
102 out.push(b'\n');
103 idx += 2;
104 }
105 b'r' => {
106 out.push(b'\r');
107 idx += 2;
108 }
109 b't' => {
110 out.push(b'\t');
111 idx += 2;
112 }
113 b'v' => {
114 out.push(0x0b);
115 idx += 2;
116 }
117 b'\\' => {
118 out.push(b'\\');
119 idx += 2;
120 }
121 b'0'..=b'7' => {
122 let mut value = 0u16;
123 let mut next = idx + 1;
124 for _ in 0..3 {
125 let Some(&digit) = field.get(next) else {
126 break;
127 };
128 if !(b'0'..=b'7').contains(&digit) {
129 break;
130 }
131 value = (value * 8) + u16::from(digit - b'0');
132 next += 1;
133 }
134 out.push(value as u8);
135 idx = next;
136 }
137 b'x' => {
138 let mut value = 0u8;
139 let mut next = idx + 2;
140 let mut digits = 0;
141 while digits < 2 {
142 let Some(&digit) = field.get(next) else {
143 break;
144 };
145 let Some(nibble) = hex_nibble(digit) else {
146 break;
147 };
148 value = (value << 4) | nibble;
149 next += 1;
150 digits += 1;
151 }
152 if digits == 0 {
153 out.push(b'x');
154 idx += 2;
155 } else {
156 out.push(value);
157 idx = next;
158 }
159 }
160 other => {
161 out.push(other);
162 idx += 2;
163 }
164 }
165 }
166
167 String::from_utf8(out)
168 .map_err(|e| PgError::Protocol(format!("COPY text field is not valid UTF-8: {}", e)))
169}
170
171fn hex_nibble(byte: u8) -> Option<u8> {
172 match byte {
173 b'0'..=b'9' => Some(byte - b'0'),
174 b'a'..=b'f' => Some(byte - b'a' + 10),
175 b'A'..=b'F' => Some(byte - b'A' + 10),
176 _ => None,
177 }
178}
179
180#[inline]
181fn return_with_desync<T>(conn: &mut PgConnection, err: PgError) -> PgResult<T> {
182 if matches!(
183 err,
184 PgError::Protocol(_) | PgError::Connection(_) | PgError::Timeout(_)
185 ) {
186 conn.mark_io_desynced();
187 }
188 Err(err)
189}
190
191fn encode_copy_export_sql(cmd: &Qail) -> PgResult<String> {
192 if cmd.action != Action::Export {
193 return Err(PgError::Query(
194 "copy_export requires Qail::Export action".to_string(),
195 ));
196 }
197
198 let (sql, params) =
199 AstEncoder::encode_cmd_sql(cmd).map_err(|e| PgError::Encode(e.to_string()))?;
200 if !params.is_empty() {
201 return Err(PgError::Encode(format!(
202 "copy_export cannot encode parameterized export with {} bind parameter(s); use an unfiltered export, a prefiltered database view, or a raw COPY statement with trusted SQL",
203 params.len()
204 )));
205 }
206
207 Ok(sql)
208}
209
210fn drain_copy_text_rows<F>(pending: &mut Vec<u8>, chunk: &[u8], on_row: &mut F) -> PgResult<()>
211where
212 F: FnMut(Vec<String>) -> PgResult<()>,
213{
214 pending.extend_from_slice(chunk);
215 while let Some(pos) = pending.iter().position(|&b| b == b'\n') {
216 let line = pending[..pos].to_vec();
217 pending.drain(..=pos);
218 let row = parse_copy_text_row(&line)?;
219 on_row(row)?;
220 }
221 Ok(())
222}
223
224fn flush_pending_copy_text_row<F>(pending: &mut Vec<u8>, on_row: &mut F) -> PgResult<()>
225where
226 F: FnMut(Vec<String>) -> PgResult<()>,
227{
228 if pending.is_empty() {
229 return Ok(());
230 }
231 let line = std::mem::take(pending);
232 let row = parse_copy_text_row(&line)?;
233 on_row(row)
234}
235
236impl PgConnection {
237 pub(crate) async fn copy_in_fast(
241 &mut self,
242 table: &str,
243 columns: &[String],
244 rows: &[Vec<qail_core::ast::Value>],
245 ) -> PgResult<u64> {
246 use crate::protocol::try_encode_copy_batch;
247
248 let cols: Vec<String> = columns
249 .iter()
250 .map(|c| quote_copy_column_ident(c))
251 .collect::<PgResult<_>>()?;
252 let sql = format!(
253 "COPY {} ({}) FROM STDIN",
254 quote_copy_table_ref(table)?,
255 cols.join(", ")
256 );
257
258 let batch_data = try_encode_copy_batch(rows)?;
261
262 let bytes = PgEncoder::try_encode_query_string(&sql)?;
264 self.write_all_with_timeout(&bytes, "stream write").await?;
265
266 let mut startup_error: Option<PgError> = None;
268 loop {
269 let msg = self.recv().await?;
270 match msg {
271 BackendMessage::CopyInResponse { .. } => {
272 if let Some(err) = startup_error {
273 return return_with_desync(self, err);
274 }
275 break;
276 }
277 BackendMessage::ReadyForQuery(_) => {
278 return return_with_desync(
279 self,
280 startup_error.unwrap_or_else(|| {
281 PgError::Protocol(
282 "COPY IN failed before CopyInResponse (unexpected ReadyForQuery)"
283 .to_string(),
284 )
285 }),
286 );
287 }
288 BackendMessage::ErrorResponse(err) => {
289 if startup_error.is_none() {
290 startup_error = Some(PgError::QueryServer(err.into()));
291 }
292 }
293 msg if is_ignorable_session_message(&msg) => {}
294 other => {
295 return return_with_desync(
296 self,
297 unexpected_backend_message("copy-in startup", &other),
298 );
299 }
300 }
301 }
302
303 self.send_copy_data(&batch_data).await?;
305
306 self.send_copy_done().await?;
308
309 let mut affected = 0u64;
311 let mut final_error: Option<PgError> = None;
312 let mut saw_command_complete = false;
313 loop {
314 let msg = self.recv().await?;
315 match msg {
316 BackendMessage::CommandComplete(tag) => {
317 if saw_command_complete {
318 return return_with_desync(
319 self,
320 PgError::Protocol(
321 "COPY IN received duplicate CommandComplete".to_string(),
322 ),
323 );
324 }
325 saw_command_complete = true;
326 if final_error.is_none() {
327 match parse_affected_rows(&tag) {
328 Ok(parsed) => affected = parsed,
329 Err(err) => return return_with_desync(self, err),
330 }
331 }
332 }
333 BackendMessage::ReadyForQuery(_) => {
334 if let Some(err) = final_error {
335 return Err(err);
336 }
337 if !saw_command_complete {
338 return return_with_desync(
339 self,
340 PgError::Protocol(
341 "COPY IN completion missing CommandComplete before ReadyForQuery"
342 .to_string(),
343 ),
344 );
345 }
346 return Ok(affected);
347 }
348 BackendMessage::ErrorResponse(err) => {
349 if final_error.is_none() {
350 final_error = Some(PgError::QueryServer(err.into()));
351 }
352 }
353 msg if is_ignorable_session_message(&msg) => {}
354 other => {
355 return return_with_desync(
356 self,
357 unexpected_backend_message("copy-in completion", &other),
358 );
359 }
360 }
361 }
362 }
363
364 pub async fn copy_in_raw(
371 &mut self,
372 table: &str,
373 columns: &[String],
374 data: &[u8],
375 ) -> PgResult<u64> {
376 let cols: Vec<String> = columns
377 .iter()
378 .map(|c| quote_copy_column_ident(c))
379 .collect::<PgResult<_>>()?;
380 let sql = format!(
381 "COPY {} ({}) FROM STDIN",
382 quote_copy_table_ref(table)?,
383 cols.join(", ")
384 );
385
386 let bytes = PgEncoder::try_encode_query_string(&sql)?;
388 self.write_all_with_timeout(&bytes, "stream write").await?;
389
390 let mut startup_error: Option<PgError> = None;
392 loop {
393 let msg = self.recv().await?;
394 match msg {
395 BackendMessage::CopyInResponse { .. } => {
396 if let Some(err) = startup_error {
397 return return_with_desync(self, err);
398 }
399 break;
400 }
401 BackendMessage::ReadyForQuery(_) => {
402 return return_with_desync(
403 self,
404 startup_error.unwrap_or_else(|| {
405 PgError::Protocol(
406 "COPY IN failed before CopyInResponse (unexpected ReadyForQuery)"
407 .to_string(),
408 )
409 }),
410 );
411 }
412 BackendMessage::ErrorResponse(err) => {
413 if startup_error.is_none() {
414 startup_error = Some(PgError::QueryServer(err.into()));
415 }
416 }
417 msg if is_ignorable_session_message(&msg) => {}
418 other => {
419 return return_with_desync(
420 self,
421 unexpected_backend_message("copy-in raw startup", &other),
422 );
423 }
424 }
425 }
426
427 self.send_copy_data(data).await?;
429
430 self.send_copy_done().await?;
432
433 let mut affected = 0u64;
435 let mut final_error: Option<PgError> = None;
436 let mut saw_command_complete = false;
437 loop {
438 let msg = self.recv().await?;
439 match msg {
440 BackendMessage::CommandComplete(tag) => {
441 if saw_command_complete {
442 return return_with_desync(
443 self,
444 PgError::Protocol(
445 "COPY IN raw received duplicate CommandComplete".to_string(),
446 ),
447 );
448 }
449 saw_command_complete = true;
450 if final_error.is_none() {
451 match parse_affected_rows(&tag) {
452 Ok(parsed) => affected = parsed,
453 Err(err) => return return_with_desync(self, err),
454 }
455 }
456 }
457 BackendMessage::ReadyForQuery(_) => {
458 if let Some(err) = final_error {
459 return Err(err);
460 }
461 if !saw_command_complete {
462 return return_with_desync(
463 self,
464 PgError::Protocol(
465 "COPY IN raw completion missing CommandComplete before ReadyForQuery"
466 .to_string(),
467 ),
468 );
469 }
470 return Ok(affected);
471 }
472 BackendMessage::ErrorResponse(err) => {
473 if final_error.is_none() {
474 final_error = Some(PgError::QueryServer(err.into()));
475 }
476 }
477 msg if is_ignorable_session_message(&msg) => {}
478 other => {
479 return return_with_desync(
480 self,
481 unexpected_backend_message("copy-in raw completion", &other),
482 );
483 }
484 }
485 }
486 }
487
488 pub(crate) async fn send_copy_data(&mut self, data: &[u8]) -> PgResult<()> {
490 let total_len = data
491 .len()
492 .checked_add(4)
493 .ok_or_else(|| PgError::Protocol("CopyData frame length overflow".to_string()))?;
494 let len = i32::try_from(total_len)
495 .map_err(|_| PgError::Protocol("CopyData frame exceeds i32::MAX".to_string()))?;
496
497 let mut buf = BytesMut::with_capacity(1 + 4 + data.len());
499 buf.extend_from_slice(b"d");
500 buf.extend_from_slice(&len.to_be_bytes());
501 buf.extend_from_slice(data);
502 self.write_all_with_timeout(&buf, "stream write").await?;
503 Ok(())
504 }
505
506 async fn send_copy_done(&mut self) -> PgResult<()> {
507 self.write_all_with_timeout(&[b'c', 0, 0, 0, 4], "stream write")
509 .await?;
510 Ok(())
511 }
512
513 async fn start_copy_out(&mut self, sql: &str, context: &str) -> PgResult<()> {
514 let bytes = PgEncoder::try_encode_query_string(sql)?;
515 self.write_all_with_timeout(&bytes, "stream write").await?;
516
517 let mut startup_error: Option<PgError> = None;
518 loop {
519 let msg = self.recv().await?;
520 match msg {
521 BackendMessage::CopyOutResponse { .. } => {
522 if let Some(err) = startup_error {
523 return return_with_desync(self, err);
524 }
525 return Ok(());
526 }
527 BackendMessage::ReadyForQuery(_) => {
528 return return_with_desync(
529 self,
530 startup_error.unwrap_or_else(|| {
531 PgError::Protocol(format!(
532 "{} failed before CopyOutResponse (unexpected ReadyForQuery)",
533 context
534 ))
535 }),
536 );
537 }
538 BackendMessage::ErrorResponse(err) => {
539 if startup_error.is_none() {
540 startup_error = Some(PgError::QueryServer(err.into()));
541 }
542 }
543 msg if is_ignorable_session_message(&msg) => {}
544 other => {
545 return return_with_desync(self, unexpected_backend_message(context, &other));
546 }
547 }
548 }
549 }
550
551 async fn stream_copy_out_chunks<F, Fut>(
552 &mut self,
553 context: &str,
554 mut on_chunk: F,
555 ) -> PgResult<()>
556 where
557 F: FnMut(Vec<u8>) -> Fut,
558 Fut: Future<Output = PgResult<()>>,
559 {
560 let mut stream_error: Option<PgError> = None;
561 let mut callback_error: Option<PgError> = None;
562 let mut saw_copy_done = false;
563 let mut saw_command_complete = false;
564
565 loop {
566 let msg = self.recv().await?;
567 match msg {
568 BackendMessage::CopyData(chunk) => {
569 if saw_copy_done {
570 return return_with_desync(
571 self,
572 PgError::Protocol(format!(
573 "{} received CopyData after CopyDone",
574 context
575 )),
576 );
577 }
578 if stream_error.is_none()
579 && callback_error.is_none()
580 && let Err(e) = on_chunk(chunk).await
581 {
582 callback_error = Some(e);
583 }
584 }
585 BackendMessage::CopyDone => {
586 if saw_copy_done {
587 return return_with_desync(
588 self,
589 PgError::Protocol(format!("{} received duplicate CopyDone", context)),
590 );
591 }
592 saw_copy_done = true;
593 }
594 BackendMessage::CommandComplete(_) => {
595 if saw_command_complete {
596 return return_with_desync(
597 self,
598 PgError::Protocol(format!(
599 "{} received duplicate CommandComplete",
600 context
601 )),
602 );
603 }
604 saw_command_complete = true;
605 }
606 BackendMessage::ReadyForQuery(_) => {
607 if let Some(err) = stream_error {
608 return Err(err);
609 }
610 if let Some(err) = callback_error {
611 return Err(err);
612 }
613 if !saw_copy_done {
614 return return_with_desync(
615 self,
616 PgError::Protocol(format!(
617 "{} missing CopyDone before ReadyForQuery",
618 context
619 )),
620 );
621 }
622 if !saw_command_complete {
623 return return_with_desync(
624 self,
625 PgError::Protocol(format!(
626 "{} missing CommandComplete before ReadyForQuery",
627 context
628 )),
629 );
630 }
631 return Ok(());
632 }
633 BackendMessage::ErrorResponse(err) => {
634 if stream_error.is_none() {
635 stream_error = Some(PgError::QueryServer(err.into()));
636 }
637 }
638 msg if is_ignorable_session_message(&msg) => {}
639 other => {
640 return return_with_desync(self, unexpected_backend_message(context, &other));
641 }
642 }
643 }
644 }
645
646 pub async fn copy_export(&mut self, cmd: &Qail) -> PgResult<Vec<Vec<String>>> {
655 let mut rows = Vec::new();
656 self.copy_export_stream_rows(cmd, |row| {
657 rows.push(row);
658 Ok(())
659 })
660 .await?;
661 Ok(rows)
662 }
663
664 pub async fn copy_export_stream_raw<F, Fut>(&mut self, cmd: &Qail, on_chunk: F) -> PgResult<()>
669 where
670 F: FnMut(Vec<u8>) -> Fut,
671 Fut: Future<Output = PgResult<()>>,
672 {
673 let sql = encode_copy_export_sql(cmd)?;
674
675 self.copy_out_raw_stream(&sql, on_chunk).await
676 }
677
678 pub async fn copy_export_stream_rows<F>(&mut self, cmd: &Qail, mut on_row: F) -> PgResult<()>
683 where
684 F: FnMut(Vec<String>) -> PgResult<()>,
685 {
686 let mut pending = Vec::new();
687 self.copy_export_stream_raw(cmd, |chunk| {
688 let res = drain_copy_text_rows(&mut pending, &chunk, &mut on_row);
689 std::future::ready(res)
690 })
691 .await?;
692 flush_pending_copy_text_row(&mut pending, &mut on_row)
693 }
694
695 pub(crate) async fn copy_out_raw(&mut self, sql: &str) -> PgResult<Vec<u8>> {
703 let mut data = Vec::new();
704 self.copy_out_raw_stream(sql, |chunk| {
705 data.extend_from_slice(&chunk);
706 std::future::ready(Ok(()))
707 })
708 .await?;
709 Ok(data)
710 }
711
712 pub(crate) async fn copy_out_raw_stream<F, Fut>(
717 &mut self,
718 sql: &str,
719 on_chunk: F,
720 ) -> PgResult<()>
721 where
722 F: FnMut(Vec<u8>) -> Fut,
723 Fut: Future<Output = PgResult<()>>,
724 {
725 self.start_copy_out(sql, "copy-out raw startup").await?;
726 self.stream_copy_out_chunks("copy-out raw stream", on_chunk)
727 .await
728 }
729}
730
731#[cfg(test)]
732mod tests {
733 use super::{
734 drain_copy_text_rows, encode_copy_export_sql, flush_pending_copy_text_row,
735 parse_copy_text_row, quote_copy_column_ident, quote_copy_table_ref, return_with_desync,
736 };
737 use crate::driver::{PgConnection, PgError, PgResult};
738 use qail_core::ast::{Operator, Qail};
739
740 #[cfg(unix)]
741 fn test_conn() -> PgConnection {
742 use crate::driver::connection::StatementCache;
743 use crate::driver::stream::PgStream;
744 use bytes::BytesMut;
745 use std::collections::{HashMap, VecDeque};
746 use std::num::NonZeroUsize;
747 use tokio::net::UnixStream;
748
749 let (unix_stream, _peer) = UnixStream::pair().expect("unix stream pair");
750 PgConnection {
751 stream: PgStream::Unix(unix_stream),
752 buffer: BytesMut::with_capacity(1024),
753 write_buf: BytesMut::with_capacity(1024),
754 sql_buf: BytesMut::with_capacity(256),
755 params_buf: Vec::new(),
756 prepared_statements: HashMap::new(),
757 stmt_cache: StatementCache::new(NonZeroUsize::new(2).expect("non-zero")),
758 column_info_cache: HashMap::new(),
759 process_id: 0,
760 cancel_key_bytes: Vec::new(),
761 requested_protocol_minor: PgConnection::default_protocol_minor(),
762 negotiated_protocol_minor: PgConnection::default_protocol_minor(),
763 notifications: VecDeque::new(),
764 replication_stream_active: false,
765 replication_mode_enabled: false,
766 last_replication_wal_end: None,
767 io_desynced: false,
768 pending_statement_closes: Vec::new(),
769 draining_statement_closes: false,
770 }
771 }
772
773 #[test]
774 fn parse_copy_text_row_splits_tabs() {
775 let row = parse_copy_text_row(b"a\tb\tc").unwrap();
776 assert_eq!(row, vec!["a", "b", "c"]);
777 }
778
779 #[test]
780 fn parse_copy_text_row_trims_cr() {
781 let row = parse_copy_text_row(b"a\tb\r").unwrap();
782 assert_eq!(row, vec!["a", "b"]);
783 }
784
785 #[test]
786 fn parse_copy_text_row_unescapes_copy_text_values() {
787 let row = parse_copy_text_row(b"a\\tb\tline\\nnext\tc\\\\d").unwrap();
788 assert_eq!(row, vec!["a\tb", "line\nnext", "c\\d"]);
789 }
790
791 #[test]
792 fn parse_copy_text_row_maps_copy_null_marker_to_empty_string() {
793 let row = parse_copy_text_row(b"a\t\\N\tb").unwrap();
794 assert_eq!(row, vec!["a", "", "b"]);
795 }
796
797 #[test]
798 fn parse_copy_text_row_rejects_invalid_utf8() {
799 let err = parse_copy_text_row(&[0xff]).expect_err("invalid UTF-8 must fail");
800 assert!(
801 err.to_string()
802 .contains("COPY text field is not valid UTF-8")
803 );
804 }
805
806 #[test]
807 fn copy_table_quoting_preserves_schema_qualification() {
808 assert_eq!(
809 quote_copy_table_ref("tenant_a.users").unwrap(),
810 "\"tenant_a\".\"users\""
811 );
812 }
813
814 #[test]
815 fn copy_identifier_quoting_rejects_nul_bytes() {
816 assert!(quote_copy_table_ref("tenant\0.users").is_err());
817 assert!(quote_copy_column_ident("name\0").is_err());
818 }
819
820 #[test]
821 fn copy_export_rejects_parameterized_ast_before_streaming() {
822 let cmd = Qail::export("users").filter("active", Operator::Eq, true);
823 let err = encode_copy_export_sql(&cmd).expect_err("bind params cannot be ignored");
824
825 assert!(matches!(err, PgError::Encode(msg) if msg.contains("parameterized export")));
826 }
827
828 #[cfg(unix)]
829 #[tokio::test]
830 async fn copy_return_with_desync_marks_protocol_error() {
831 let mut conn = test_conn();
832
833 let err = return_with_desync::<()>(
834 &mut conn,
835 PgError::Protocol("copy protocol ordering broke".to_string()),
836 )
837 .expect_err("protocol error must be returned");
838
839 assert!(err.to_string().contains("copy protocol ordering broke"));
840 assert!(conn.is_io_desynced());
841 }
842
843 #[test]
844 fn drain_copy_text_rows_handles_chunk_boundaries() {
845 let mut pending = Vec::new();
846 let mut rows: Vec<Vec<String>> = Vec::new();
847
848 drain_copy_text_rows(&mut pending, b"a\tb\nc", &mut |row: Vec<String>| {
849 rows.push(row);
850 Ok(())
851 })
852 .unwrap();
853 assert_eq!(rows, vec![vec!["a".to_string(), "b".to_string()]]);
854 assert_eq!(pending, b"c");
855
856 drain_copy_text_rows(&mut pending, b"\td\n", &mut |row: Vec<String>| {
857 rows.push(row);
858 Ok(())
859 })
860 .unwrap();
861 assert_eq!(
862 rows,
863 vec![
864 vec!["a".to_string(), "b".to_string()],
865 vec!["c".to_string(), "d".to_string()]
866 ]
867 );
868 assert!(pending.is_empty());
869 }
870
871 #[test]
872 fn flush_pending_copy_text_row_emits_final_partial_line() {
873 let mut pending = b"x\ty".to_vec();
874 let mut rows = Vec::new();
875 let mut on_row = |row: Vec<String>| -> PgResult<()> {
876 rows.push(row);
877 Ok(())
878 };
879
880 flush_pending_copy_text_row(&mut pending, &mut on_row).unwrap();
881 assert_eq!(rows, vec![vec!["x".to_string(), "y".to_string()]]);
882 assert!(pending.is_empty());
883 }
884
885 #[test]
886 fn callback_error_bubbles_from_row_drainer() {
887 let mut pending = Vec::new();
888 let mut on_row =
889 |_row: Vec<String>| -> PgResult<()> { Err(PgError::Query("fail".to_string())) };
890
891 let err = drain_copy_text_rows(&mut pending, b"a\tb\n", &mut on_row).unwrap_err();
892 assert!(matches!(err, PgError::Query(msg) if msg == "fail"));
893 }
894}