1use super::{PgConnection, PgError, PgResult, parse_affected_rows};
5use crate::protocol::{AstEncoder, BackendMessage, PgEncoder};
6use bytes::BytesMut;
7use qail_core::ast::{Action, Qail};
8use tokio::io::AsyncWriteExt;
9
10fn quote_ident(ident: &str) -> String {
13 format!("\"{}\"", ident.replace('\0', "").replace('"', "\"\""))
14}
15
16impl PgConnection {
17 pub(crate) async fn copy_in_fast(
21 &mut self,
22 table: &str,
23 columns: &[String],
24 rows: &[Vec<qail_core::ast::Value>],
25 ) -> PgResult<u64> {
26 use crate::protocol::encode_copy_batch;
27
28 let cols: Vec<String> = columns.iter().map(|c| quote_ident(c)).collect();
29 let sql = format!(
30 "COPY {} ({}) FROM STDIN",
31 quote_ident(table),
32 cols.join(", ")
33 );
34
35 let bytes = PgEncoder::encode_query_string(&sql);
37 self.stream.write_all(&bytes).await?;
38
39 loop {
41 let msg = self.recv().await?;
42 match msg {
43 BackendMessage::CopyInResponse { .. } => break,
44 BackendMessage::ErrorResponse(err) => {
45 return Err(PgError::QueryServer(err.into()));
46 }
47 _ => {}
48 }
49 }
50
51 let batch_data = encode_copy_batch(rows);
53
54 self.send_copy_data(&batch_data).await?;
56
57 self.send_copy_done().await?;
59
60 let mut affected = 0u64;
62 loop {
63 let msg = self.recv().await?;
64 match msg {
65 BackendMessage::CommandComplete(tag) => {
66 affected = parse_affected_rows(&tag);
67 }
68 BackendMessage::ReadyForQuery(_) => {
69 return Ok(affected);
70 }
71 BackendMessage::ErrorResponse(err) => {
72 return Err(PgError::QueryServer(err.into()));
73 }
74 _ => {}
75 }
76 }
77 }
78
79 pub async fn copy_in_raw(
86 &mut self,
87 table: &str,
88 columns: &[String],
89 data: &[u8],
90 ) -> PgResult<u64> {
91 let cols: Vec<String> = columns.iter().map(|c| quote_ident(c)).collect();
92 let sql = format!(
93 "COPY {} ({}) FROM STDIN",
94 quote_ident(table),
95 cols.join(", ")
96 );
97
98 let bytes = PgEncoder::encode_query_string(&sql);
100 self.stream.write_all(&bytes).await?;
101
102 loop {
104 let msg = self.recv().await?;
105 match msg {
106 BackendMessage::CopyInResponse { .. } => break,
107 BackendMessage::ErrorResponse(err) => {
108 return Err(PgError::QueryServer(err.into()));
109 }
110 _ => {}
111 }
112 }
113
114 self.send_copy_data(data).await?;
116
117 self.send_copy_done().await?;
119
120 let mut affected = 0u64;
122 loop {
123 let msg = self.recv().await?;
124 match msg {
125 BackendMessage::CommandComplete(tag) => {
126 affected = parse_affected_rows(&tag);
127 }
128 BackendMessage::ReadyForQuery(_) => {
129 return Ok(affected);
130 }
131 BackendMessage::ErrorResponse(err) => {
132 return Err(PgError::QueryServer(err.into()));
133 }
134 _ => {}
135 }
136 }
137 }
138
139 async fn send_copy_data(&mut self, data: &[u8]) -> PgResult<()> {
141 let len = (data.len() + 4) as i32;
143 let mut buf = BytesMut::with_capacity(1 + 4 + data.len());
144 buf.extend_from_slice(b"d");
145 buf.extend_from_slice(&len.to_be_bytes());
146 buf.extend_from_slice(data);
147 self.stream.write_all(&buf).await?;
148 Ok(())
149 }
150
151 async fn send_copy_done(&mut self) -> PgResult<()> {
152 self.stream.write_all(&[b'c', 0, 0, 0, 4]).await?;
154 Ok(())
155 }
156
157 pub async fn copy_export(&mut self, cmd: &Qail) -> PgResult<Vec<Vec<String>>> {
167 if cmd.action != Action::Export {
168 return Err(PgError::Query(
169 "copy_export requires Qail::Export action".to_string(),
170 ));
171 }
172
173 let (sql, _params) =
175 AstEncoder::encode_cmd_sql(cmd).map_err(|e| PgError::Encode(e.to_string()))?;
176
177 let bytes = PgEncoder::encode_query_string(&sql);
179 self.stream.write_all(&bytes).await?;
180
181 loop {
183 let msg = self.recv().await?;
184 match msg {
185 BackendMessage::CopyOutResponse { .. } => break,
186 BackendMessage::ErrorResponse(err) => {
187 return Err(PgError::QueryServer(err.into()));
188 }
189 _ => {}
190 }
191 }
192
193 let mut rows = Vec::new();
195 loop {
196 let msg = self.recv().await?;
197 match msg {
198 BackendMessage::CopyData(data) => {
199 let line = String::from_utf8_lossy(&data);
200 let line = line.trim_end_matches('\n');
201 let cols: Vec<String> = line.split('\t').map(|s| s.to_string()).collect();
202 rows.push(cols);
203 }
204 BackendMessage::CopyDone => {}
205 BackendMessage::CommandComplete(_) => {}
206 BackendMessage::ReadyForQuery(_) => {
207 return Ok(rows);
208 }
209 BackendMessage::ErrorResponse(err) => {
210 return Err(PgError::QueryServer(err.into()));
211 }
212 _ => {}
213 }
214 }
215 }
216
217 pub(crate) async fn copy_out_raw(&mut self, sql: &str) -> PgResult<Vec<u8>> {
225 let bytes = PgEncoder::encode_query_string(sql);
227 self.stream.write_all(&bytes).await?;
228
229 loop {
231 let msg = self.recv().await?;
232 match msg {
233 BackendMessage::CopyOutResponse { .. } => break,
234 BackendMessage::ErrorResponse(err) => {
235 return Err(PgError::QueryServer(err.into()));
236 }
237 _ => {}
238 }
239 }
240
241 let mut data = Vec::new();
243 loop {
244 let msg = self.recv().await?;
245 match msg {
246 BackendMessage::CopyData(chunk) => {
247 data.extend_from_slice(&chunk);
248 }
249 BackendMessage::CopyDone => {}
250 BackendMessage::CommandComplete(_) => {}
251 BackendMessage::ReadyForQuery(_) => {
252 return Ok(data);
253 }
254 BackendMessage::ErrorResponse(err) => {
255 return Err(PgError::QueryServer(err.into()));
256 }
257 _ => {}
258 }
259 }
260 }
261}