1use super::{PgConnection, PgError, PgResult, parse_affected_rows};
5use crate::protocol::{AstEncoder, BackendMessage, PgEncoder};
6use bytes::BytesMut;
7use qail_core::ast::{Action, Qail};
8use tokio::io::AsyncWriteExt;
9
10impl PgConnection {
11 pub(crate) async fn copy_in_fast(
15 &mut self,
16 table: &str,
17 columns: &[String],
18 rows: &[Vec<qail_core::ast::Value>],
19 ) -> PgResult<u64> {
20 use crate::protocol::encode_copy_batch;
21
22 let cols = columns.join(", ");
23 let sql = format!("COPY {} ({}) FROM STDIN", table, cols);
24
25 let bytes = PgEncoder::encode_query_string(&sql);
27 self.stream.write_all(&bytes).await?;
28
29 loop {
31 let msg = self.recv().await?;
32 match msg {
33 BackendMessage::CopyInResponse { .. } => break,
34 BackendMessage::ErrorResponse(err) => {
35 return Err(PgError::Query(err.message));
36 }
37 _ => {}
38 }
39 }
40
41 let batch_data = encode_copy_batch(rows);
43
44 self.send_copy_data(&batch_data).await?;
46
47 self.send_copy_done().await?;
49
50 let mut affected = 0u64;
52 loop {
53 let msg = self.recv().await?;
54 match msg {
55 BackendMessage::CommandComplete(tag) => {
56 affected = parse_affected_rows(&tag);
57 }
58 BackendMessage::ReadyForQuery(_) => {
59 return Ok(affected);
60 }
61 BackendMessage::ErrorResponse(err) => {
62 return Err(PgError::Query(err.message));
63 }
64 _ => {}
65 }
66 }
67 }
68
69 pub async fn copy_in_raw(
76 &mut self,
77 table: &str,
78 columns: &[String],
79 data: &[u8],
80 ) -> PgResult<u64> {
81 let cols = columns.join(", ");
82 let sql = format!("COPY {} ({}) FROM STDIN", table, cols);
83
84 let bytes = PgEncoder::encode_query_string(&sql);
86 self.stream.write_all(&bytes).await?;
87
88 loop {
90 let msg = self.recv().await?;
91 match msg {
92 BackendMessage::CopyInResponse { .. } => break,
93 BackendMessage::ErrorResponse(err) => {
94 return Err(PgError::Query(err.message));
95 }
96 _ => {}
97 }
98 }
99
100 self.send_copy_data(data).await?;
102
103 self.send_copy_done().await?;
105
106 let mut affected = 0u64;
108 loop {
109 let msg = self.recv().await?;
110 match msg {
111 BackendMessage::CommandComplete(tag) => {
112 affected = parse_affected_rows(&tag);
113 }
114 BackendMessage::ReadyForQuery(_) => {
115 return Ok(affected);
116 }
117 BackendMessage::ErrorResponse(err) => {
118 return Err(PgError::Query(err.message));
119 }
120 _ => {}
121 }
122 }
123 }
124
125 async fn send_copy_data(&mut self, data: &[u8]) -> PgResult<()> {
127 let len = (data.len() + 4) as i32;
129 let mut buf = BytesMut::with_capacity(1 + 4 + data.len());
130 buf.extend_from_slice(b"d");
131 buf.extend_from_slice(&len.to_be_bytes());
132 buf.extend_from_slice(data);
133 self.stream.write_all(&buf).await?;
134 Ok(())
135 }
136
137 async fn send_copy_done(&mut self) -> PgResult<()> {
138 self.stream.write_all(&[b'c', 0, 0, 0, 4]).await?;
140 Ok(())
141 }
142
143 pub async fn copy_export(&mut self, cmd: &Qail) -> PgResult<Vec<Vec<String>>> {
153
154 if cmd.action != Action::Export {
155 return Err(PgError::Query(
156 "copy_export requires Qail::Export action".to_string(),
157 ));
158 }
159
160 let (sql, _params) = AstEncoder::encode_cmd_sql(cmd);
162
163 let bytes = PgEncoder::encode_query_string(&sql);
165 self.stream.write_all(&bytes).await?;
166
167 loop {
169 let msg = self.recv().await?;
170 match msg {
171 BackendMessage::CopyOutResponse { .. } => break,
172 BackendMessage::ErrorResponse(err) => {
173 return Err(PgError::Query(err.message));
174 }
175 _ => {}
176 }
177 }
178
179 let mut rows = Vec::new();
181 loop {
182 let msg = self.recv().await?;
183 match msg {
184 BackendMessage::CopyData(data) => {
185 let line = String::from_utf8_lossy(&data);
186 let line = line.trim_end_matches('\n');
187 let cols: Vec<String> = line.split('\t').map(|s| s.to_string()).collect();
188 rows.push(cols);
189 }
190 BackendMessage::CopyDone => {}
191 BackendMessage::CommandComplete(_) => {}
192 BackendMessage::ReadyForQuery(_) => {
193 return Ok(rows);
194 }
195 BackendMessage::ErrorResponse(err) => {
196 return Err(PgError::Query(err.message));
197 }
198 _ => {}
199 }
200 }
201 }
202
203 pub async fn copy_out_raw(&mut self, sql: &str) -> PgResult<Vec<u8>> {
207 let bytes = PgEncoder::encode_query_string(sql);
209 self.stream.write_all(&bytes).await?;
210
211 loop {
213 let msg = self.recv().await?;
214 match msg {
215 BackendMessage::CopyOutResponse { .. } => break,
216 BackendMessage::ErrorResponse(err) => {
217 return Err(PgError::Query(err.message));
218 }
219 _ => {}
220 }
221 }
222
223 let mut data = Vec::new();
225 loop {
226 let msg = self.recv().await?;
227 match msg {
228 BackendMessage::CopyData(chunk) => {
229 data.extend_from_slice(&chunk);
230 }
231 BackendMessage::CopyDone => {}
232 BackendMessage::CommandComplete(_) => {}
233 BackendMessage::ReadyForQuery(_) => {
234 return Ok(data);
235 }
236 BackendMessage::ErrorResponse(err) => {
237 return Err(PgError::Query(err.message));
238 }
239 _ => {}
240 }
241 }
242 }
243}