1use super::{PgConnection, PgError, PgResult, parse_affected_rows};
5use crate::protocol::{AstEncoder, BackendMessage, PgEncoder};
6use bytes::BytesMut;
7use qail_core::ast::{Action, Qail};
8use tokio::io::AsyncWriteExt;
9
10fn quote_ident(ident: &str) -> String {
13 format!("\"{}\"" , ident.replace('\0', "").replace('"', "\"\""))
14}
15
16impl PgConnection {
17 pub(crate) async fn copy_in_fast(
21 &mut self,
22 table: &str,
23 columns: &[String],
24 rows: &[Vec<qail_core::ast::Value>],
25 ) -> PgResult<u64> {
26 use crate::protocol::encode_copy_batch;
27
28 let cols: Vec<String> = columns.iter().map(|c| quote_ident(c)).collect();
29 let sql = format!("COPY {} ({}) FROM STDIN", quote_ident(table), cols.join(", "));
30
31 let bytes = PgEncoder::encode_query_string(&sql);
33 self.stream.write_all(&bytes).await?;
34
35 loop {
37 let msg = self.recv().await?;
38 match msg {
39 BackendMessage::CopyInResponse { .. } => break,
40 BackendMessage::ErrorResponse(err) => {
41 return Err(PgError::Query(err.message));
42 }
43 _ => {}
44 }
45 }
46
47 let batch_data = encode_copy_batch(rows);
49
50 self.send_copy_data(&batch_data).await?;
52
53 self.send_copy_done().await?;
55
56 let mut affected = 0u64;
58 loop {
59 let msg = self.recv().await?;
60 match msg {
61 BackendMessage::CommandComplete(tag) => {
62 affected = parse_affected_rows(&tag);
63 }
64 BackendMessage::ReadyForQuery(_) => {
65 return Ok(affected);
66 }
67 BackendMessage::ErrorResponse(err) => {
68 return Err(PgError::Query(err.message));
69 }
70 _ => {}
71 }
72 }
73 }
74
75 pub async fn copy_in_raw(
82 &mut self,
83 table: &str,
84 columns: &[String],
85 data: &[u8],
86 ) -> PgResult<u64> {
87 let cols: Vec<String> = columns.iter().map(|c| quote_ident(c)).collect();
88 let sql = format!("COPY {} ({}) FROM STDIN", quote_ident(table), cols.join(", "));
89
90 let bytes = PgEncoder::encode_query_string(&sql);
92 self.stream.write_all(&bytes).await?;
93
94 loop {
96 let msg = self.recv().await?;
97 match msg {
98 BackendMessage::CopyInResponse { .. } => break,
99 BackendMessage::ErrorResponse(err) => {
100 return Err(PgError::Query(err.message));
101 }
102 _ => {}
103 }
104 }
105
106 self.send_copy_data(data).await?;
108
109 self.send_copy_done().await?;
111
112 let mut affected = 0u64;
114 loop {
115 let msg = self.recv().await?;
116 match msg {
117 BackendMessage::CommandComplete(tag) => {
118 affected = parse_affected_rows(&tag);
119 }
120 BackendMessage::ReadyForQuery(_) => {
121 return Ok(affected);
122 }
123 BackendMessage::ErrorResponse(err) => {
124 return Err(PgError::Query(err.message));
125 }
126 _ => {}
127 }
128 }
129 }
130
131 async fn send_copy_data(&mut self, data: &[u8]) -> PgResult<()> {
133 let len = (data.len() + 4) as i32;
135 let mut buf = BytesMut::with_capacity(1 + 4 + data.len());
136 buf.extend_from_slice(b"d");
137 buf.extend_from_slice(&len.to_be_bytes());
138 buf.extend_from_slice(data);
139 self.stream.write_all(&buf).await?;
140 Ok(())
141 }
142
143 async fn send_copy_done(&mut self) -> PgResult<()> {
144 self.stream.write_all(&[b'c', 0, 0, 0, 4]).await?;
146 Ok(())
147 }
148
149 pub async fn copy_export(&mut self, cmd: &Qail) -> PgResult<Vec<Vec<String>>> {
159
160 if cmd.action != Action::Export {
161 return Err(PgError::Query(
162 "copy_export requires Qail::Export action".to_string(),
163 ));
164 }
165
166 let (sql, _params) = AstEncoder::encode_cmd_sql(cmd).map_err(|e| PgError::Encode(e.to_string()))?;
168
169 let bytes = PgEncoder::encode_query_string(&sql);
171 self.stream.write_all(&bytes).await?;
172
173 loop {
175 let msg = self.recv().await?;
176 match msg {
177 BackendMessage::CopyOutResponse { .. } => break,
178 BackendMessage::ErrorResponse(err) => {
179 return Err(PgError::Query(err.message));
180 }
181 _ => {}
182 }
183 }
184
185 let mut rows = Vec::new();
187 loop {
188 let msg = self.recv().await?;
189 match msg {
190 BackendMessage::CopyData(data) => {
191 let line = String::from_utf8_lossy(&data);
192 let line = line.trim_end_matches('\n');
193 let cols: Vec<String> = line.split('\t').map(|s| s.to_string()).collect();
194 rows.push(cols);
195 }
196 BackendMessage::CopyDone => {}
197 BackendMessage::CommandComplete(_) => {}
198 BackendMessage::ReadyForQuery(_) => {
199 return Ok(rows);
200 }
201 BackendMessage::ErrorResponse(err) => {
202 return Err(PgError::Query(err.message));
203 }
204 _ => {}
205 }
206 }
207 }
208
209 pub(crate) async fn copy_out_raw(&mut self, sql: &str) -> PgResult<Vec<u8>> {
217 let bytes = PgEncoder::encode_query_string(sql);
219 self.stream.write_all(&bytes).await?;
220
221 loop {
223 let msg = self.recv().await?;
224 match msg {
225 BackendMessage::CopyOutResponse { .. } => break,
226 BackendMessage::ErrorResponse(err) => {
227 return Err(PgError::Query(err.message));
228 }
229 _ => {}
230 }
231 }
232
233 let mut data = Vec::new();
235 loop {
236 let msg = self.recv().await?;
237 match msg {
238 BackendMessage::CopyData(chunk) => {
239 data.extend_from_slice(&chunk);
240 }
241 BackendMessage::CopyDone => {}
242 BackendMessage::CommandComplete(_) => {}
243 BackendMessage::ReadyForQuery(_) => {
244 return Ok(data);
245 }
246 BackendMessage::ErrorResponse(err) => {
247 return Err(PgError::Query(err.message));
248 }
249 _ => {}
250 }
251 }
252 }
253}