qail_pg/driver/
copy.rs

1//! COPY protocol methods for PostgreSQL bulk operations.
2//!
3
4use super::{PgConnection, PgError, PgResult, parse_affected_rows};
5use crate::protocol::{AstEncoder, BackendMessage, PgEncoder};
6use bytes::BytesMut;
7use qail_core::ast::{Action, Qail};
8use tokio::io::AsyncWriteExt;
9
10impl PgConnection {
11    /// **Fast** bulk insert using COPY protocol with zero-allocation encoding.
12    /// Encodes all rows into a single buffer and writes with one syscall.
13    /// ~2x faster than `copy_in_internal` due to batched I/O.
14    pub(crate) async fn copy_in_fast(
15        &mut self,
16        table: &str,
17        columns: &[String],
18        rows: &[Vec<qail_core::ast::Value>],
19    ) -> PgResult<u64> {
20        use crate::protocol::encode_copy_batch;
21
22        let cols = columns.join(", ");
23        let sql = format!("COPY {} ({}) FROM STDIN", table, cols);
24
25        // Send COPY command
26        let bytes = PgEncoder::encode_query_string(&sql);
27        self.stream.write_all(&bytes).await?;
28
29        // Wait for CopyInResponse
30        loop {
31            let msg = self.recv().await?;
32            match msg {
33                BackendMessage::CopyInResponse { .. } => break,
34                BackendMessage::ErrorResponse(err) => {
35                    return Err(PgError::Query(err.message));
36                }
37                _ => {}
38            }
39        }
40
41        // Encode ALL rows into a single buffer (zero-allocation per value)
42        let batch_data = encode_copy_batch(rows);
43
44        // Single write for entire batch!
45        self.send_copy_data(&batch_data).await?;
46
47        // Send CopyDone
48        self.send_copy_done().await?;
49
50        // Wait for CommandComplete
51        let mut affected = 0u64;
52        loop {
53            let msg = self.recv().await?;
54            match msg {
55                BackendMessage::CommandComplete(tag) => {
56                    affected = parse_affected_rows(&tag);
57                }
58                BackendMessage::ReadyForQuery(_) => {
59                    return Ok(affected);
60                }
61                BackendMessage::ErrorResponse(err) => {
62                    return Err(PgError::Query(err.message));
63                }
64                _ => {}
65            }
66        }
67    }
68
69    /// **Fastest** bulk insert using COPY protocol with pre-encoded data.
70    /// Accepts raw COPY text format bytes, no encoding needed.
71    /// Use when caller has already encoded rows to COPY format.
72    /// # Format
73    /// Data should be tab-separated rows with newlines:
74    /// `1\thello\t3.14\n2\tworld\t2.71\n`
75    pub async fn copy_in_raw(
76        &mut self,
77        table: &str,
78        columns: &[String],
79        data: &[u8],
80    ) -> PgResult<u64> {
81        let cols = columns.join(", ");
82        let sql = format!("COPY {} ({}) FROM STDIN", table, cols);
83
84        // Send COPY command
85        let bytes = PgEncoder::encode_query_string(&sql);
86        self.stream.write_all(&bytes).await?;
87
88        // Wait for CopyInResponse
89        loop {
90            let msg = self.recv().await?;
91            match msg {
92                BackendMessage::CopyInResponse { .. } => break,
93                BackendMessage::ErrorResponse(err) => {
94                    return Err(PgError::Query(err.message));
95                }
96                _ => {}
97            }
98        }
99
100        // Single write - data is already encoded!
101        self.send_copy_data(data).await?;
102
103        // Send CopyDone
104        self.send_copy_done().await?;
105
106        // Wait for CommandComplete
107        let mut affected = 0u64;
108        loop {
109            let msg = self.recv().await?;
110            match msg {
111                BackendMessage::CommandComplete(tag) => {
112                    affected = parse_affected_rows(&tag);
113                }
114                BackendMessage::ReadyForQuery(_) => {
115                    return Ok(affected);
116                }
117                BackendMessage::ErrorResponse(err) => {
118                    return Err(PgError::Query(err.message));
119                }
120                _ => {}
121            }
122        }
123    }
124
125    /// Send CopyData message (raw bytes).
126    async fn send_copy_data(&mut self, data: &[u8]) -> PgResult<()> {
127        // CopyData: 'd' + length + data
128        let len = (data.len() + 4) as i32;
129        let mut buf = BytesMut::with_capacity(1 + 4 + data.len());
130        buf.extend_from_slice(b"d");
131        buf.extend_from_slice(&len.to_be_bytes());
132        buf.extend_from_slice(data);
133        self.stream.write_all(&buf).await?;
134        Ok(())
135    }
136
137    async fn send_copy_done(&mut self) -> PgResult<()> {
138        // CopyDone: 'c' + length (4)
139        self.stream.write_all(&[b'c', 0, 0, 0, 4]).await?;
140        Ok(())
141    }
142
143    /// Export data using COPY TO STDOUT (AST-native).
144    /// Takes a Qail::Export and returns rows as Vec<Vec<String>>.
145    /// # Example
146    /// ```ignore
147    /// let cmd = Qail::export("users")
148    ///     .columns(["id", "name"])
149    ///     .filter("active", true);
150    /// let rows = conn.copy_export(&cmd).await?;
151    /// ```
152    pub async fn copy_export(&mut self, cmd: &Qail) -> PgResult<Vec<Vec<String>>> {
153
154        if cmd.action != Action::Export {
155            return Err(PgError::Query(
156                "copy_export requires Qail::Export action".to_string(),
157            ));
158        }
159
160        // Encode command to SQL using AST encoder
161        let (sql, _params) = AstEncoder::encode_cmd_sql(cmd);
162
163        // Send COPY command
164        let bytes = PgEncoder::encode_query_string(&sql);
165        self.stream.write_all(&bytes).await?;
166
167        // Wait for CopyOutResponse
168        loop {
169            let msg = self.recv().await?;
170            match msg {
171                BackendMessage::CopyOutResponse { .. } => break,
172                BackendMessage::ErrorResponse(err) => {
173                    return Err(PgError::Query(err.message));
174                }
175                _ => {}
176            }
177        }
178
179        // Receive CopyData messages until CopyDone
180        let mut rows = Vec::new();
181        loop {
182            let msg = self.recv().await?;
183            match msg {
184                BackendMessage::CopyData(data) => {
185                    let line = String::from_utf8_lossy(&data);
186                    let line = line.trim_end_matches('\n');
187                    let cols: Vec<String> = line.split('\t').map(|s| s.to_string()).collect();
188                    rows.push(cols);
189                }
190                BackendMessage::CopyDone => {}
191                BackendMessage::CommandComplete(_) => {}
192                BackendMessage::ReadyForQuery(_) => {
193                    return Ok(rows);
194                }
195                BackendMessage::ErrorResponse(err) => {
196                    return Err(PgError::Query(err.message));
197                }
198                _ => {}
199            }
200        }
201    }
202
203    /// Export data using raw COPY TO STDOUT, returning raw bytes.
204    /// Format: tab-separated values, newline-terminated rows.
205    /// Suitable for direct re-import via copy_in_raw.
206    pub async fn copy_out_raw(&mut self, sql: &str) -> PgResult<Vec<u8>> {
207        // Send COPY command
208        let bytes = PgEncoder::encode_query_string(sql);
209        self.stream.write_all(&bytes).await?;
210
211        // Wait for CopyOutResponse
212        loop {
213            let msg = self.recv().await?;
214            match msg {
215                BackendMessage::CopyOutResponse { .. } => break,
216                BackendMessage::ErrorResponse(err) => {
217                    return Err(PgError::Query(err.message));
218                }
219                _ => {}
220            }
221        }
222
223        // Receive CopyData messages until CopyDone
224        let mut data = Vec::new();
225        loop {
226            let msg = self.recv().await?;
227            match msg {
228                BackendMessage::CopyData(chunk) => {
229                    data.extend_from_slice(&chunk);
230                }
231                BackendMessage::CopyDone => {}
232                BackendMessage::CommandComplete(_) => {}
233                BackendMessage::ReadyForQuery(_) => {
234                    return Ok(data);
235                }
236                BackendMessage::ErrorResponse(err) => {
237                    return Err(PgError::Query(err.message));
238                }
239                _ => {}
240            }
241        }
242    }
243}