sql_splitter/parser/
postgres_copy.rs1use crate::schema::{ColumnId, ColumnType, TableSchema};
7use smallvec::SmallVec;
8
9use super::mysql_insert::{FkRef, PkValue};
11
12pub type PkTuple = SmallVec<[PkValue; 2]>;
14
15#[derive(Debug, Clone)]
17pub struct ParsedCopyRow {
18 pub raw: Vec<u8>,
20 pub pk: Option<PkTuple>,
22 pub fk_values: Vec<(FkRef, PkTuple)>,
24}
25
26pub struct CopyParser<'a> {
28 data: &'a [u8],
29 table_schema: Option<&'a TableSchema>,
30 column_order: Vec<Option<ColumnId>>,
32}
33
34impl<'a> CopyParser<'a> {
35 pub fn new(data: &'a [u8]) -> Self {
37 Self {
38 data,
39 table_schema: None,
40 column_order: Vec::new(),
41 }
42 }
43
44 pub fn with_schema(mut self, schema: &'a TableSchema) -> Self {
46 self.table_schema = Some(schema);
47 self
48 }
49
50 pub fn with_column_order(mut self, columns: Vec<String>) -> Self {
52 if let Some(schema) = self.table_schema {
53 self.column_order = columns
54 .iter()
55 .map(|name| schema.get_column_id(name))
56 .collect();
57 }
58 self
59 }
60
61 pub fn parse_rows(&mut self) -> anyhow::Result<Vec<ParsedCopyRow>> {
63 if self.column_order.is_empty() {
65 if let Some(schema) = self.table_schema {
66 self.column_order = schema.columns.iter().map(|c| Some(c.ordinal)).collect();
67 }
68 }
69
70 let mut rows = Vec::new();
71 let mut pos = 0;
72
73 while pos < self.data.len() {
74 let line_end = self.data[pos..]
76 .iter()
77 .position(|&b| b == b'\n')
78 .map(|p| pos + p)
79 .unwrap_or(self.data.len());
80
81 let line = &self.data[pos..line_end];
82
83 if line == b"\\." || line.is_empty() {
85 pos = line_end + 1;
86 continue;
87 }
88
89 if let Some(row) = self.parse_row(line)? {
91 rows.push(row);
92 }
93
94 pos = line_end + 1;
95 }
96
97 Ok(rows)
98 }
99
100 fn parse_row(&self, line: &[u8]) -> anyhow::Result<Option<ParsedCopyRow>> {
102 let raw = line.to_vec();
103
104 let values: Vec<CopyValue> = self.split_and_parse_values(line);
106
107 let (pk, fk_values) = if let Some(schema) = self.table_schema {
109 self.extract_pk_fk(&values, schema)
110 } else {
111 (None, Vec::new())
112 };
113
114 Ok(Some(ParsedCopyRow { raw, pk, fk_values }))
115 }
116
117 fn split_and_parse_values(&self, line: &[u8]) -> Vec<CopyValue> {
119 let mut values = Vec::new();
120 let mut start = 0;
121
122 for (i, &b) in line.iter().enumerate() {
123 if b == b'\t' {
124 values.push(self.parse_copy_value(&line[start..i]));
125 start = i + 1;
126 }
127 }
128 if start <= line.len() {
130 values.push(self.parse_copy_value(&line[start..]));
131 }
132
133 values
134 }
135
136 fn parse_copy_value(&self, value: &[u8]) -> CopyValue {
138 if value == b"\\N" {
140 return CopyValue::Null;
141 }
142
143 let decoded = self.decode_copy_escapes(value);
145
146 if let Ok(s) = std::str::from_utf8(&decoded) {
148 if let Ok(n) = s.parse::<i64>() {
149 return CopyValue::Integer(n);
150 }
151 if let Ok(n) = s.parse::<i128>() {
152 return CopyValue::BigInteger(n);
153 }
154 }
155
156 CopyValue::Text(decoded)
157 }
158
159 pub fn decode_copy_escapes(&self, value: &[u8]) -> Vec<u8> {
161 let mut result = Vec::with_capacity(value.len());
162 let mut i = 0;
163
164 while i < value.len() {
165 if value[i] == b'\\' && i + 1 < value.len() {
166 let next = value[i + 1];
167 let decoded = match next {
168 b'n' => b'\n',
169 b'r' => b'\r',
170 b't' => b'\t',
171 b'\\' => b'\\',
172 b'N' => {
173 result.push(b'\\');
175 result.push(b'N');
176 i += 2;
177 continue;
178 }
179 _ => {
180 result.push(b'\\');
182 result.push(next);
183 i += 2;
184 continue;
185 }
186 };
187 result.push(decoded);
188 i += 2;
189 } else {
190 result.push(value[i]);
191 i += 1;
192 }
193 }
194
195 result
196 }
197
198 fn extract_pk_fk(
200 &self,
201 values: &[CopyValue],
202 schema: &TableSchema,
203 ) -> (Option<PkTuple>, Vec<(FkRef, PkTuple)>) {
204 let mut pk_values = PkTuple::new();
205 let mut fk_values = Vec::new();
206
207 for (idx, col_id_opt) in self.column_order.iter().enumerate() {
209 if let Some(col_id) = col_id_opt {
210 if schema.is_pk_column(*col_id) {
211 if let Some(value) = values.get(idx) {
212 let pk_val = self.value_to_pk(value, schema.column(*col_id));
213 pk_values.push(pk_val);
214 }
215 }
216 }
217 }
218
219 for (fk_idx, fk) in schema.foreign_keys.iter().enumerate() {
221 if fk.referenced_table_id.is_none() {
222 continue;
223 }
224
225 let mut fk_tuple = PkTuple::new();
226 let mut all_non_null = true;
227
228 for &col_id in &fk.columns {
229 if let Some(idx) = self.column_order.iter().position(|&c| c == Some(col_id)) {
230 if let Some(value) = values.get(idx) {
231 let pk_val = self.value_to_pk(value, schema.column(col_id));
232 if pk_val.is_null() {
233 all_non_null = false;
234 break;
235 }
236 fk_tuple.push(pk_val);
237 }
238 }
239 }
240
241 if all_non_null && !fk_tuple.is_empty() {
242 fk_values.push((
243 FkRef {
244 table_id: schema.id.0,
245 fk_index: fk_idx as u16,
246 },
247 fk_tuple,
248 ));
249 }
250 }
251
252 let pk = if pk_values.is_empty() || pk_values.iter().any(|v| v.is_null()) {
253 None
254 } else {
255 Some(pk_values)
256 };
257
258 (pk, fk_values)
259 }
260
261 fn value_to_pk(&self, value: &CopyValue, col: Option<&crate::schema::Column>) -> PkValue {
263 match value {
264 CopyValue::Null => PkValue::Null,
265 CopyValue::Integer(n) => PkValue::Int(*n),
266 CopyValue::BigInteger(n) => PkValue::BigInt(*n),
267 CopyValue::Text(bytes) => {
268 let s = String::from_utf8_lossy(bytes);
269
270 if let Some(col) = col {
272 match col.col_type {
273 ColumnType::Int => {
274 if let Ok(n) = s.parse::<i64>() {
275 return PkValue::Int(n);
276 }
277 }
278 ColumnType::BigInt => {
279 if let Ok(n) = s.parse::<i128>() {
280 return PkValue::BigInt(n);
281 }
282 }
283 _ => {}
284 }
285 }
286
287 PkValue::Text(s.into_owned().into_boxed_str())
288 }
289 }
290 }
291}
292
293#[derive(Debug, Clone)]
295enum CopyValue {
296 Null,
297 Integer(i64),
298 BigInteger(i128),
299 Text(Vec<u8>),
300}
301
302pub fn parse_copy_columns(header: &str) -> Vec<String> {
304 if let Some(start) = header.find('(') {
306 if let Some(end) = header.find(')') {
307 let cols = &header[start + 1..end];
308 return cols
309 .split(',')
310 .map(|c| c.trim().trim_matches('"').to_string())
311 .collect();
312 }
313 }
314 Vec::new()
315}
316
317pub fn parse_postgres_copy_rows(
319 data: &[u8],
320 schema: &TableSchema,
321 column_order: Vec<String>,
322) -> anyhow::Result<Vec<ParsedCopyRow>> {
323 let mut parser = CopyParser::new(data)
324 .with_schema(schema)
325 .with_column_order(column_order);
326 parser.parse_rows()
327}