Skip to main content

qail_pg/driver/
copy.rs

1//! COPY protocol methods for PostgreSQL bulk operations.
2//!
3
4use super::{
5    PgConnection, PgError, PgResult, is_ignorable_session_message, parse_affected_rows,
6    unexpected_backend_message,
7};
8use crate::protocol::{AstEncoder, BackendMessage, PgEncoder};
9use bytes::BytesMut;
10use qail_core::ast::{Action, Qail};
11use std::future::Future;
12
13/// Quote a single SQL identifier atom for COPY statements.
14pub(crate) fn quote_copy_column_ident(ident: &str) -> PgResult<String> {
15    if ident.is_empty() {
16        return Err(PgError::Query(
17            "COPY column identifier is empty".to_string(),
18        ));
19    }
20    if ident.contains('\0') {
21        return Err(PgError::Query(
22            "COPY column identifier contains NUL byte".to_string(),
23        ));
24    }
25    Ok(format!("\"{}\"", ident.replace('"', "\"\"")))
26}
27
28/// Quote a COPY table reference, preserving schema-qualified names.
29pub(crate) fn quote_copy_table_ref(table: &str) -> PgResult<String> {
30    if table.is_empty() {
31        return Err(PgError::Query("COPY table identifier is empty".to_string()));
32    }
33    if table.contains('\0') {
34        return Err(PgError::Query(
35            "COPY table identifier contains NUL byte".to_string(),
36        ));
37    }
38
39    table
40        .split('.')
41        .map(|part| {
42            let part = part.trim();
43            if part.is_empty() {
44                return Err(PgError::Query(
45                    "COPY table identifier contains an empty path segment".to_string(),
46                ));
47            }
48            quote_copy_column_ident(part)
49        })
50        .collect::<PgResult<Vec<_>>>()
51        .map(|parts| parts.join("."))
52}
53
54fn parse_copy_text_row(line: &[u8]) -> PgResult<Vec<String>> {
55    let line = if line.ends_with(b"\r") {
56        &line[..line.len().saturating_sub(1)]
57    } else {
58        line
59    };
60
61    let mut fields = Vec::new();
62    let mut start = 0;
63    for (idx, byte) in line.iter().enumerate() {
64        if *byte == b'\t' {
65            fields.push(decode_copy_text_field(&line[start..idx])?);
66            start = idx + 1;
67        }
68    }
69    fields.push(decode_copy_text_field(&line[start..])?);
70    Ok(fields)
71}
72
73fn decode_copy_text_field(field: &[u8]) -> PgResult<String> {
74    if field == b"\\N" {
75        return Ok(String::new());
76    }
77
78    let mut out = Vec::with_capacity(field.len());
79    let mut idx = 0;
80    while idx < field.len() {
81        if field[idx] != b'\\' {
82            out.push(field[idx]);
83            idx += 1;
84            continue;
85        }
86
87        let Some(&escaped) = field.get(idx + 1) else {
88            out.push(b'\\');
89            break;
90        };
91
92        match escaped {
93            b'b' => {
94                out.push(0x08);
95                idx += 2;
96            }
97            b'f' => {
98                out.push(0x0c);
99                idx += 2;
100            }
101            b'n' => {
102                out.push(b'\n');
103                idx += 2;
104            }
105            b'r' => {
106                out.push(b'\r');
107                idx += 2;
108            }
109            b't' => {
110                out.push(b'\t');
111                idx += 2;
112            }
113            b'v' => {
114                out.push(0x0b);
115                idx += 2;
116            }
117            b'\\' => {
118                out.push(b'\\');
119                idx += 2;
120            }
121            b'0'..=b'7' => {
122                let mut value = 0u16;
123                let mut next = idx + 1;
124                for _ in 0..3 {
125                    let Some(&digit) = field.get(next) else {
126                        break;
127                    };
128                    if !(b'0'..=b'7').contains(&digit) {
129                        break;
130                    }
131                    value = (value * 8) + u16::from(digit - b'0');
132                    next += 1;
133                }
134                out.push(value as u8);
135                idx = next;
136            }
137            b'x' => {
138                let mut value = 0u8;
139                let mut next = idx + 2;
140                let mut digits = 0;
141                while digits < 2 {
142                    let Some(&digit) = field.get(next) else {
143                        break;
144                    };
145                    let Some(nibble) = hex_nibble(digit) else {
146                        break;
147                    };
148                    value = (value << 4) | nibble;
149                    next += 1;
150                    digits += 1;
151                }
152                if digits == 0 {
153                    out.push(b'x');
154                    idx += 2;
155                } else {
156                    out.push(value);
157                    idx = next;
158                }
159            }
160            other => {
161                out.push(other);
162                idx += 2;
163            }
164        }
165    }
166
167    String::from_utf8(out)
168        .map_err(|e| PgError::Protocol(format!("COPY text field is not valid UTF-8: {}", e)))
169}
170
171fn hex_nibble(byte: u8) -> Option<u8> {
172    match byte {
173        b'0'..=b'9' => Some(byte - b'0'),
174        b'a'..=b'f' => Some(byte - b'a' + 10),
175        b'A'..=b'F' => Some(byte - b'A' + 10),
176        _ => None,
177    }
178}
179
180#[inline]
181fn return_with_desync<T>(conn: &mut PgConnection, err: PgError) -> PgResult<T> {
182    if matches!(
183        err,
184        PgError::Protocol(_) | PgError::Connection(_) | PgError::Timeout(_)
185    ) {
186        conn.mark_io_desynced();
187    }
188    Err(err)
189}
190
191fn encode_copy_export_sql(cmd: &Qail) -> PgResult<String> {
192    if cmd.action != Action::Export {
193        return Err(PgError::Query(
194            "copy_export requires Qail::Export action".to_string(),
195        ));
196    }
197
198    let (sql, params) =
199        AstEncoder::encode_cmd_sql(cmd).map_err(|e| PgError::Encode(e.to_string()))?;
200    if !params.is_empty() {
201        return Err(PgError::Encode(format!(
202            "copy_export cannot encode parameterized export with {} bind parameter(s); use an unfiltered export, a prefiltered database view, or a raw COPY statement with trusted SQL",
203            params.len()
204        )));
205    }
206
207    Ok(sql)
208}
209
210fn drain_copy_text_rows<F>(pending: &mut Vec<u8>, chunk: &[u8], on_row: &mut F) -> PgResult<()>
211where
212    F: FnMut(Vec<String>) -> PgResult<()>,
213{
214    pending.extend_from_slice(chunk);
215    while let Some(pos) = pending.iter().position(|&b| b == b'\n') {
216        let line = pending[..pos].to_vec();
217        pending.drain(..=pos);
218        let row = parse_copy_text_row(&line)?;
219        on_row(row)?;
220    }
221    Ok(())
222}
223
224fn flush_pending_copy_text_row<F>(pending: &mut Vec<u8>, on_row: &mut F) -> PgResult<()>
225where
226    F: FnMut(Vec<String>) -> PgResult<()>,
227{
228    if pending.is_empty() {
229        return Ok(());
230    }
231    let line = std::mem::take(pending);
232    let row = parse_copy_text_row(&line)?;
233    on_row(row)
234}
235
236impl PgConnection {
237    /// **Fast** bulk insert using COPY protocol with zero-allocation encoding.
238    /// Encodes all rows into a single buffer and writes with one syscall.
239    /// ~2x faster than `copy_in_internal` due to batched I/O.
240    pub(crate) async fn copy_in_fast(
241        &mut self,
242        table: &str,
243        columns: &[String],
244        rows: &[Vec<qail_core::ast::Value>],
245    ) -> PgResult<u64> {
246        use crate::protocol::try_encode_copy_batch;
247
248        let cols: Vec<String> = columns
249            .iter()
250            .map(|c| quote_copy_column_ident(c))
251            .collect::<PgResult<_>>()?;
252        let sql = format!(
253            "COPY {} ({}) FROM STDIN",
254            quote_copy_table_ref(table)?,
255            cols.join(", ")
256        );
257
258        // Encode before opening COPY mode so invalid AST data cannot leave the
259        // connection waiting for CopyFail/CopyDone cleanup.
260        let batch_data = try_encode_copy_batch(rows)?;
261
262        // Send COPY command
263        let bytes = PgEncoder::try_encode_query_string(&sql)?;
264        self.write_all_with_timeout(&bytes, "stream write").await?;
265
266        // Wait for CopyInResponse
267        let mut startup_error: Option<PgError> = None;
268        loop {
269            let msg = self.recv().await?;
270            match msg {
271                BackendMessage::CopyInResponse { .. } => {
272                    if let Some(err) = startup_error {
273                        return return_with_desync(self, err);
274                    }
275                    break;
276                }
277                BackendMessage::ReadyForQuery(_) => {
278                    return return_with_desync(
279                        self,
280                        startup_error.unwrap_or_else(|| {
281                            PgError::Protocol(
282                                "COPY IN failed before CopyInResponse (unexpected ReadyForQuery)"
283                                    .to_string(),
284                            )
285                        }),
286                    );
287                }
288                BackendMessage::ErrorResponse(err) => {
289                    if startup_error.is_none() {
290                        startup_error = Some(PgError::QueryServer(err.into()));
291                    }
292                }
293                msg if is_ignorable_session_message(&msg) => {}
294                other => {
295                    return return_with_desync(
296                        self,
297                        unexpected_backend_message("copy-in startup", &other),
298                    );
299                }
300            }
301        }
302
303        // Single write for entire batch!
304        self.send_copy_data(&batch_data).await?;
305
306        // Send CopyDone
307        self.send_copy_done().await?;
308
309        // Wait for CommandComplete
310        let mut affected = 0u64;
311        let mut final_error: Option<PgError> = None;
312        let mut saw_command_complete = false;
313        loop {
314            let msg = self.recv().await?;
315            match msg {
316                BackendMessage::CommandComplete(tag) => {
317                    if saw_command_complete {
318                        return return_with_desync(
319                            self,
320                            PgError::Protocol(
321                                "COPY IN received duplicate CommandComplete".to_string(),
322                            ),
323                        );
324                    }
325                    saw_command_complete = true;
326                    if final_error.is_none() {
327                        match parse_affected_rows(&tag) {
328                            Ok(parsed) => affected = parsed,
329                            Err(err) => return return_with_desync(self, err),
330                        }
331                    }
332                }
333                BackendMessage::ReadyForQuery(_) => {
334                    if let Some(err) = final_error {
335                        return Err(err);
336                    }
337                    if !saw_command_complete {
338                        return return_with_desync(
339                            self,
340                            PgError::Protocol(
341                                "COPY IN completion missing CommandComplete before ReadyForQuery"
342                                    .to_string(),
343                            ),
344                        );
345                    }
346                    return Ok(affected);
347                }
348                BackendMessage::ErrorResponse(err) => {
349                    if final_error.is_none() {
350                        final_error = Some(PgError::QueryServer(err.into()));
351                    }
352                }
353                msg if is_ignorable_session_message(&msg) => {}
354                other => {
355                    return return_with_desync(
356                        self,
357                        unexpected_backend_message("copy-in completion", &other),
358                    );
359                }
360            }
361        }
362    }
363
364    /// **Fastest** bulk insert using COPY protocol with pre-encoded data.
365    /// Accepts raw COPY text format bytes, no encoding needed.
366    /// Use when caller has already encoded rows to COPY format.
367    /// # Format
368    /// Data should be tab-separated rows with newlines:
369    /// `1\thello\t3.14\n2\tworld\t2.71\n`
370    pub async fn copy_in_raw(
371        &mut self,
372        table: &str,
373        columns: &[String],
374        data: &[u8],
375    ) -> PgResult<u64> {
376        let cols: Vec<String> = columns
377            .iter()
378            .map(|c| quote_copy_column_ident(c))
379            .collect::<PgResult<_>>()?;
380        let sql = format!(
381            "COPY {} ({}) FROM STDIN",
382            quote_copy_table_ref(table)?,
383            cols.join(", ")
384        );
385
386        // Send COPY command
387        let bytes = PgEncoder::try_encode_query_string(&sql)?;
388        self.write_all_with_timeout(&bytes, "stream write").await?;
389
390        // Wait for CopyInResponse
391        let mut startup_error: Option<PgError> = None;
392        loop {
393            let msg = self.recv().await?;
394            match msg {
395                BackendMessage::CopyInResponse { .. } => {
396                    if let Some(err) = startup_error {
397                        return return_with_desync(self, err);
398                    }
399                    break;
400                }
401                BackendMessage::ReadyForQuery(_) => {
402                    return return_with_desync(
403                        self,
404                        startup_error.unwrap_or_else(|| {
405                            PgError::Protocol(
406                                "COPY IN failed before CopyInResponse (unexpected ReadyForQuery)"
407                                    .to_string(),
408                            )
409                        }),
410                    );
411                }
412                BackendMessage::ErrorResponse(err) => {
413                    if startup_error.is_none() {
414                        startup_error = Some(PgError::QueryServer(err.into()));
415                    }
416                }
417                msg if is_ignorable_session_message(&msg) => {}
418                other => {
419                    return return_with_desync(
420                        self,
421                        unexpected_backend_message("copy-in raw startup", &other),
422                    );
423                }
424            }
425        }
426
427        // Single write - data is already encoded!
428        self.send_copy_data(data).await?;
429
430        // Send CopyDone
431        self.send_copy_done().await?;
432
433        // Wait for CommandComplete
434        let mut affected = 0u64;
435        let mut final_error: Option<PgError> = None;
436        let mut saw_command_complete = false;
437        loop {
438            let msg = self.recv().await?;
439            match msg {
440                BackendMessage::CommandComplete(tag) => {
441                    if saw_command_complete {
442                        return return_with_desync(
443                            self,
444                            PgError::Protocol(
445                                "COPY IN raw received duplicate CommandComplete".to_string(),
446                            ),
447                        );
448                    }
449                    saw_command_complete = true;
450                    if final_error.is_none() {
451                        match parse_affected_rows(&tag) {
452                            Ok(parsed) => affected = parsed,
453                            Err(err) => return return_with_desync(self, err),
454                        }
455                    }
456                }
457                BackendMessage::ReadyForQuery(_) => {
458                    if let Some(err) = final_error {
459                        return Err(err);
460                    }
461                    if !saw_command_complete {
462                        return return_with_desync(
463                            self,
464                            PgError::Protocol(
465                                "COPY IN raw completion missing CommandComplete before ReadyForQuery"
466                                    .to_string(),
467                            ),
468                        );
469                    }
470                    return Ok(affected);
471                }
472                BackendMessage::ErrorResponse(err) => {
473                    if final_error.is_none() {
474                        final_error = Some(PgError::QueryServer(err.into()));
475                    }
476                }
477                msg if is_ignorable_session_message(&msg) => {}
478                other => {
479                    return return_with_desync(
480                        self,
481                        unexpected_backend_message("copy-in raw completion", &other),
482                    );
483                }
484            }
485        }
486    }
487
488    /// Send CopyData message (raw bytes).
489    pub(crate) async fn send_copy_data(&mut self, data: &[u8]) -> PgResult<()> {
490        let total_len = data
491            .len()
492            .checked_add(4)
493            .ok_or_else(|| PgError::Protocol("CopyData frame length overflow".to_string()))?;
494        let len = i32::try_from(total_len)
495            .map_err(|_| PgError::Protocol("CopyData frame exceeds i32::MAX".to_string()))?;
496
497        // CopyData: 'd' + length + data
498        let mut buf = BytesMut::with_capacity(1 + 4 + data.len());
499        buf.extend_from_slice(b"d");
500        buf.extend_from_slice(&len.to_be_bytes());
501        buf.extend_from_slice(data);
502        self.write_all_with_timeout(&buf, "stream write").await?;
503        Ok(())
504    }
505
506    async fn send_copy_done(&mut self) -> PgResult<()> {
507        // CopyDone: 'c' + length (4)
508        self.write_all_with_timeout(&[b'c', 0, 0, 0, 4], "stream write")
509            .await?;
510        Ok(())
511    }
512
513    async fn start_copy_out(&mut self, sql: &str, context: &str) -> PgResult<()> {
514        let bytes = PgEncoder::try_encode_query_string(sql)?;
515        self.write_all_with_timeout(&bytes, "stream write").await?;
516
517        let mut startup_error: Option<PgError> = None;
518        loop {
519            let msg = self.recv().await?;
520            match msg {
521                BackendMessage::CopyOutResponse { .. } => {
522                    if let Some(err) = startup_error {
523                        return return_with_desync(self, err);
524                    }
525                    return Ok(());
526                }
527                BackendMessage::ReadyForQuery(_) => {
528                    return return_with_desync(
529                        self,
530                        startup_error.unwrap_or_else(|| {
531                            PgError::Protocol(format!(
532                                "{} failed before CopyOutResponse (unexpected ReadyForQuery)",
533                                context
534                            ))
535                        }),
536                    );
537                }
538                BackendMessage::ErrorResponse(err) => {
539                    if startup_error.is_none() {
540                        startup_error = Some(PgError::QueryServer(err.into()));
541                    }
542                }
543                msg if is_ignorable_session_message(&msg) => {}
544                other => {
545                    return return_with_desync(self, unexpected_backend_message(context, &other));
546                }
547            }
548        }
549    }
550
551    async fn stream_copy_out_chunks<F, Fut>(
552        &mut self,
553        context: &str,
554        mut on_chunk: F,
555    ) -> PgResult<()>
556    where
557        F: FnMut(Vec<u8>) -> Fut,
558        Fut: Future<Output = PgResult<()>>,
559    {
560        let mut stream_error: Option<PgError> = None;
561        let mut callback_error: Option<PgError> = None;
562        let mut saw_copy_done = false;
563        let mut saw_command_complete = false;
564
565        loop {
566            let msg = self.recv().await?;
567            match msg {
568                BackendMessage::CopyData(chunk) => {
569                    if saw_copy_done {
570                        return return_with_desync(
571                            self,
572                            PgError::Protocol(format!(
573                                "{} received CopyData after CopyDone",
574                                context
575                            )),
576                        );
577                    }
578                    if stream_error.is_none()
579                        && callback_error.is_none()
580                        && let Err(e) = on_chunk(chunk).await
581                    {
582                        callback_error = Some(e);
583                    }
584                }
585                BackendMessage::CopyDone => {
586                    if saw_copy_done {
587                        return return_with_desync(
588                            self,
589                            PgError::Protocol(format!("{} received duplicate CopyDone", context)),
590                        );
591                    }
592                    saw_copy_done = true;
593                }
594                BackendMessage::CommandComplete(_) => {
595                    if saw_command_complete {
596                        return return_with_desync(
597                            self,
598                            PgError::Protocol(format!(
599                                "{} received duplicate CommandComplete",
600                                context
601                            )),
602                        );
603                    }
604                    saw_command_complete = true;
605                }
606                BackendMessage::ReadyForQuery(_) => {
607                    if let Some(err) = stream_error {
608                        return Err(err);
609                    }
610                    if let Some(err) = callback_error {
611                        return Err(err);
612                    }
613                    if !saw_copy_done {
614                        return return_with_desync(
615                            self,
616                            PgError::Protocol(format!(
617                                "{} missing CopyDone before ReadyForQuery",
618                                context
619                            )),
620                        );
621                    }
622                    if !saw_command_complete {
623                        return return_with_desync(
624                            self,
625                            PgError::Protocol(format!(
626                                "{} missing CommandComplete before ReadyForQuery",
627                                context
628                            )),
629                        );
630                    }
631                    return Ok(());
632                }
633                BackendMessage::ErrorResponse(err) => {
634                    if stream_error.is_none() {
635                        stream_error = Some(PgError::QueryServer(err.into()));
636                    }
637                }
638                msg if is_ignorable_session_message(&msg) => {}
639                other => {
640                    return return_with_desync(self, unexpected_backend_message(context, &other));
641                }
642            }
643        }
644    }
645
646    /// Export data using COPY TO STDOUT (AST-native).
647    /// Takes a `Qail::Export` and returns rows as `Vec<Vec<String>>`.
648    /// # Example
649    /// ```ignore
650    /// let cmd = Qail::export("users")
651    ///     .columns(["id", "name"]);
652    /// let rows = conn.copy_export(&cmd).await?;
653    /// ```
654    pub async fn copy_export(&mut self, cmd: &Qail) -> PgResult<Vec<Vec<String>>> {
655        let mut rows = Vec::new();
656        self.copy_export_stream_rows(cmd, |row| {
657            rows.push(row);
658            Ok(())
659        })
660        .await?;
661        Ok(rows)
662    }
663
664    /// Stream COPY TO STDOUT chunks using an AST-native `Qail::Export` command.
665    ///
666    /// Chunks are forwarded as they arrive from PostgreSQL, so memory usage
667    /// stays bounded by network frame size and callback processing.
668    pub async fn copy_export_stream_raw<F, Fut>(&mut self, cmd: &Qail, on_chunk: F) -> PgResult<()>
669    where
670        F: FnMut(Vec<u8>) -> Fut,
671        Fut: Future<Output = PgResult<()>>,
672    {
673        let sql = encode_copy_export_sql(cmd)?;
674
675        self.copy_out_raw_stream(&sql, on_chunk).await
676    }
677
678    /// Stream COPY TO STDOUT rows using an AST-native `Qail::Export` command.
679    ///
680    /// Parses PostgreSQL COPY text lines into `Vec<String>` rows and invokes
681    /// `on_row` for each row without buffering the full result.
682    pub async fn copy_export_stream_rows<F>(&mut self, cmd: &Qail, mut on_row: F) -> PgResult<()>
683    where
684        F: FnMut(Vec<String>) -> PgResult<()>,
685    {
686        let mut pending = Vec::new();
687        self.copy_export_stream_raw(cmd, |chunk| {
688            let res = drain_copy_text_rows(&mut pending, &chunk, &mut on_row);
689            std::future::ready(res)
690        })
691        .await?;
692        flush_pending_copy_text_row(&mut pending, &mut on_row)
693    }
694
695    /// Export data using raw COPY TO STDOUT, returning raw bytes.
696    /// Format: tab-separated values, newline-terminated rows.
697    /// Suitable for direct re-import via copy_in_raw.
698    ///
699    /// # Safety
700    /// `pub(crate)` — not exposed externally because callers pass raw SQL.
701    /// External code should use `copy_export()` with the AST encoder instead.
702    pub(crate) async fn copy_out_raw(&mut self, sql: &str) -> PgResult<Vec<u8>> {
703        let mut data = Vec::new();
704        self.copy_out_raw_stream(sql, |chunk| {
705            data.extend_from_slice(&chunk);
706            std::future::ready(Ok(()))
707        })
708        .await?;
709        Ok(data)
710    }
711
712    /// Stream raw COPY TO STDOUT bytes with bounded memory usage.
713    ///
714    /// # Safety
715    /// `pub(crate)` — callers pass raw SQL.
716    pub(crate) async fn copy_out_raw_stream<F, Fut>(
717        &mut self,
718        sql: &str,
719        on_chunk: F,
720    ) -> PgResult<()>
721    where
722        F: FnMut(Vec<u8>) -> Fut,
723        Fut: Future<Output = PgResult<()>>,
724    {
725        self.start_copy_out(sql, "copy-out raw startup").await?;
726        self.stream_copy_out_chunks("copy-out raw stream", on_chunk)
727            .await
728    }
729}
730
731#[cfg(test)]
732mod tests {
733    use super::{
734        drain_copy_text_rows, encode_copy_export_sql, flush_pending_copy_text_row,
735        parse_copy_text_row, quote_copy_column_ident, quote_copy_table_ref, return_with_desync,
736    };
737    use crate::driver::{PgConnection, PgError, PgResult};
738    use qail_core::ast::{Operator, Qail};
739
740    #[cfg(unix)]
741    fn test_conn() -> PgConnection {
742        use crate::driver::connection::StatementCache;
743        use crate::driver::stream::PgStream;
744        use bytes::BytesMut;
745        use std::collections::{HashMap, VecDeque};
746        use std::num::NonZeroUsize;
747        use tokio::net::UnixStream;
748
749        let (unix_stream, _peer) = UnixStream::pair().expect("unix stream pair");
750        PgConnection {
751            stream: PgStream::Unix(unix_stream),
752            buffer: BytesMut::with_capacity(1024),
753            write_buf: BytesMut::with_capacity(1024),
754            sql_buf: BytesMut::with_capacity(256),
755            params_buf: Vec::new(),
756            prepared_statements: HashMap::new(),
757            stmt_cache: StatementCache::new(NonZeroUsize::new(2).expect("non-zero")),
758            column_info_cache: HashMap::new(),
759            process_id: 0,
760            cancel_key_bytes: Vec::new(),
761            requested_protocol_minor: PgConnection::default_protocol_minor(),
762            negotiated_protocol_minor: PgConnection::default_protocol_minor(),
763            notifications: VecDeque::new(),
764            replication_stream_active: false,
765            replication_mode_enabled: false,
766            last_replication_wal_end: None,
767            io_desynced: false,
768            pending_statement_closes: Vec::new(),
769            draining_statement_closes: false,
770        }
771    }
772
773    #[test]
774    fn parse_copy_text_row_splits_tabs() {
775        let row = parse_copy_text_row(b"a\tb\tc").unwrap();
776        assert_eq!(row, vec!["a", "b", "c"]);
777    }
778
779    #[test]
780    fn parse_copy_text_row_trims_cr() {
781        let row = parse_copy_text_row(b"a\tb\r").unwrap();
782        assert_eq!(row, vec!["a", "b"]);
783    }
784
785    #[test]
786    fn parse_copy_text_row_unescapes_copy_text_values() {
787        let row = parse_copy_text_row(b"a\\tb\tline\\nnext\tc\\\\d").unwrap();
788        assert_eq!(row, vec!["a\tb", "line\nnext", "c\\d"]);
789    }
790
791    #[test]
792    fn parse_copy_text_row_maps_copy_null_marker_to_empty_string() {
793        let row = parse_copy_text_row(b"a\t\\N\tb").unwrap();
794        assert_eq!(row, vec!["a", "", "b"]);
795    }
796
797    #[test]
798    fn parse_copy_text_row_rejects_invalid_utf8() {
799        let err = parse_copy_text_row(&[0xff]).expect_err("invalid UTF-8 must fail");
800        assert!(
801            err.to_string()
802                .contains("COPY text field is not valid UTF-8")
803        );
804    }
805
806    #[test]
807    fn copy_table_quoting_preserves_schema_qualification() {
808        assert_eq!(
809            quote_copy_table_ref("tenant_a.users").unwrap(),
810            "\"tenant_a\".\"users\""
811        );
812    }
813
814    #[test]
815    fn copy_identifier_quoting_rejects_nul_bytes() {
816        assert!(quote_copy_table_ref("tenant\0.users").is_err());
817        assert!(quote_copy_column_ident("name\0").is_err());
818    }
819
820    #[test]
821    fn copy_export_rejects_parameterized_ast_before_streaming() {
822        let cmd = Qail::export("users").filter("active", Operator::Eq, true);
823        let err = encode_copy_export_sql(&cmd).expect_err("bind params cannot be ignored");
824
825        assert!(matches!(err, PgError::Encode(msg) if msg.contains("parameterized export")));
826    }
827
828    #[cfg(unix)]
829    #[tokio::test]
830    async fn copy_return_with_desync_marks_protocol_error() {
831        let mut conn = test_conn();
832
833        let err = return_with_desync::<()>(
834            &mut conn,
835            PgError::Protocol("copy protocol ordering broke".to_string()),
836        )
837        .expect_err("protocol error must be returned");
838
839        assert!(err.to_string().contains("copy protocol ordering broke"));
840        assert!(conn.is_io_desynced());
841    }
842
843    #[test]
844    fn drain_copy_text_rows_handles_chunk_boundaries() {
845        let mut pending = Vec::new();
846        let mut rows: Vec<Vec<String>> = Vec::new();
847
848        drain_copy_text_rows(&mut pending, b"a\tb\nc", &mut |row: Vec<String>| {
849            rows.push(row);
850            Ok(())
851        })
852        .unwrap();
853        assert_eq!(rows, vec![vec!["a".to_string(), "b".to_string()]]);
854        assert_eq!(pending, b"c");
855
856        drain_copy_text_rows(&mut pending, b"\td\n", &mut |row: Vec<String>| {
857            rows.push(row);
858            Ok(())
859        })
860        .unwrap();
861        assert_eq!(
862            rows,
863            vec![
864                vec!["a".to_string(), "b".to_string()],
865                vec!["c".to_string(), "d".to_string()]
866            ]
867        );
868        assert!(pending.is_empty());
869    }
870
871    #[test]
872    fn flush_pending_copy_text_row_emits_final_partial_line() {
873        let mut pending = b"x\ty".to_vec();
874        let mut rows = Vec::new();
875        let mut on_row = |row: Vec<String>| -> PgResult<()> {
876            rows.push(row);
877            Ok(())
878        };
879
880        flush_pending_copy_text_row(&mut pending, &mut on_row).unwrap();
881        assert_eq!(rows, vec![vec!["x".to_string(), "y".to_string()]]);
882        assert!(pending.is_empty());
883    }
884
885    #[test]
886    fn callback_error_bubbles_from_row_drainer() {
887        let mut pending = Vec::new();
888        let mut on_row =
889            |_row: Vec<String>| -> PgResult<()> { Err(PgError::Query("fail".to_string())) };
890
891        let err = drain_copy_text_rows(&mut pending, b"a\tb\n", &mut on_row).unwrap_err();
892        assert!(matches!(err, PgError::Query(msg) if msg == "fail"));
893    }
894}