sql_splitter/parser/
postgres_copy.rs

1//! PostgreSQL COPY statement parser.
2//!
3//! Parses COPY ... FROM stdin data blocks to extract individual rows
4//! and optionally extract PK/FK column values for dependency tracking.
5
6use crate::schema::{ColumnId, ColumnType, TableSchema};
7use smallvec::SmallVec;
8
9// Re-use types from mysql_insert for consistency
10use super::mysql_insert::{FkRef, PkValue};
11
12/// Tuple of PK values for composite primary keys
13pub type PkTuple = SmallVec<[PkValue; 2]>;
14
15/// A parsed row from a COPY data block
16#[derive(Debug, Clone)]
17pub struct ParsedCopyRow {
18    /// Raw bytes of the row (tab-separated values, no newline)
19    pub raw: Vec<u8>,
20    /// Extracted primary key values (if table has PK and values are non-NULL)
21    pub pk: Option<PkTuple>,
22    /// Extracted foreign key values with their references
23    pub fk_values: Vec<(FkRef, PkTuple)>,
24}
25
26/// Parser for PostgreSQL COPY data blocks
27pub struct CopyParser<'a> {
28    data: &'a [u8],
29    table_schema: Option<&'a TableSchema>,
30    /// Column order from COPY header
31    column_order: Vec<Option<ColumnId>>,
32}
33
34impl<'a> CopyParser<'a> {
35    /// Create a new parser for COPY data
36    pub fn new(data: &'a [u8]) -> Self {
37        Self {
38            data,
39            table_schema: None,
40            column_order: Vec::new(),
41        }
42    }
43
44    /// Set the table schema for PK/FK extraction
45    pub fn with_schema(mut self, schema: &'a TableSchema) -> Self {
46        self.table_schema = Some(schema);
47        self
48    }
49
50    /// Set column order from COPY header
51    pub fn with_column_order(mut self, columns: Vec<String>) -> Self {
52        if let Some(schema) = self.table_schema {
53            self.column_order = columns
54                .iter()
55                .map(|name| schema.get_column_id(name))
56                .collect();
57        }
58        self
59    }
60
61    /// Parse all rows from the COPY data block
62    pub fn parse_rows(&mut self) -> anyhow::Result<Vec<ParsedCopyRow>> {
63        // If no explicit column order, use natural schema order
64        if self.column_order.is_empty() {
65            if let Some(schema) = self.table_schema {
66                self.column_order = schema.columns.iter().map(|c| Some(c.ordinal)).collect();
67            }
68        }
69
70        let mut rows = Vec::new();
71        let mut pos = 0;
72
73        while pos < self.data.len() {
74            // Find end of line
75            let line_end = self.data[pos..]
76                .iter()
77                .position(|&b| b == b'\n')
78                .map(|p| pos + p)
79                .unwrap_or(self.data.len());
80
81            let line = &self.data[pos..line_end];
82
83            // Check for terminator
84            if line == b"\\." || line.is_empty() {
85                pos = line_end + 1;
86                continue;
87            }
88
89            // Parse the row
90            if let Some(row) = self.parse_row(line)? {
91                rows.push(row);
92            }
93
94            pos = line_end + 1;
95        }
96
97        Ok(rows)
98    }
99
100    /// Parse a single tab-separated row
101    fn parse_row(&self, line: &[u8]) -> anyhow::Result<Option<ParsedCopyRow>> {
102        let raw = line.to_vec();
103
104        // Split by tabs
105        let values: Vec<CopyValue> = self.split_and_parse_values(line);
106
107        // Extract PK and FK if we have schema
108        let (pk, fk_values) = if let Some(schema) = self.table_schema {
109            self.extract_pk_fk(&values, schema)
110        } else {
111            (None, Vec::new())
112        };
113
114        Ok(Some(ParsedCopyRow { raw, pk, fk_values }))
115    }
116
117    /// Split line by tabs and parse each value
118    fn split_and_parse_values(&self, line: &[u8]) -> Vec<CopyValue> {
119        let mut values = Vec::new();
120        let mut start = 0;
121
122        for (i, &b) in line.iter().enumerate() {
123            if b == b'\t' {
124                values.push(self.parse_copy_value(&line[start..i]));
125                start = i + 1;
126            }
127        }
128        // Last value
129        if start <= line.len() {
130            values.push(self.parse_copy_value(&line[start..]));
131        }
132
133        values
134    }
135
136    /// Parse a single COPY value
137    fn parse_copy_value(&self, value: &[u8]) -> CopyValue {
138        // Check for NULL marker
139        if value == b"\\N" {
140            return CopyValue::Null;
141        }
142
143        // Decode escape sequences
144        let decoded = self.decode_copy_escapes(value);
145
146        // Try to parse as integer
147        if let Ok(s) = std::str::from_utf8(&decoded) {
148            if let Ok(n) = s.parse::<i64>() {
149                return CopyValue::Integer(n);
150            }
151            if let Ok(n) = s.parse::<i128>() {
152                return CopyValue::BigInteger(n);
153            }
154        }
155
156        CopyValue::Text(decoded)
157    }
158
159    /// Decode PostgreSQL COPY escape sequences
160    pub fn decode_copy_escapes(&self, value: &[u8]) -> Vec<u8> {
161        let mut result = Vec::with_capacity(value.len());
162        let mut i = 0;
163
164        while i < value.len() {
165            if value[i] == b'\\' && i + 1 < value.len() {
166                let next = value[i + 1];
167                let decoded = match next {
168                    b'n' => b'\n',
169                    b'r' => b'\r',
170                    b't' => b'\t',
171                    b'\\' => b'\\',
172                    b'N' => {
173                        // This shouldn't happen here since we check for \N above
174                        result.push(b'\\');
175                        result.push(b'N');
176                        i += 2;
177                        continue;
178                    }
179                    _ => {
180                        // Unknown escape, keep as-is
181                        result.push(b'\\');
182                        result.push(next);
183                        i += 2;
184                        continue;
185                    }
186                };
187                result.push(decoded);
188                i += 2;
189            } else {
190                result.push(value[i]);
191                i += 1;
192            }
193        }
194
195        result
196    }
197
198    /// Extract PK and FK values from parsed values
199    fn extract_pk_fk(
200        &self,
201        values: &[CopyValue],
202        schema: &TableSchema,
203    ) -> (Option<PkTuple>, Vec<(FkRef, PkTuple)>) {
204        let mut pk_values = PkTuple::new();
205        let mut fk_values = Vec::new();
206
207        // Build PK from columns marked as primary key
208        for (idx, col_id_opt) in self.column_order.iter().enumerate() {
209            if let Some(col_id) = col_id_opt {
210                if schema.is_pk_column(*col_id) {
211                    if let Some(value) = values.get(idx) {
212                        let pk_val = self.value_to_pk(value, schema.column(*col_id));
213                        pk_values.push(pk_val);
214                    }
215                }
216            }
217        }
218
219        // Build FK tuples
220        for (fk_idx, fk) in schema.foreign_keys.iter().enumerate() {
221            if fk.referenced_table_id.is_none() {
222                continue;
223            }
224
225            let mut fk_tuple = PkTuple::new();
226            let mut all_non_null = true;
227
228            for &col_id in &fk.columns {
229                if let Some(idx) = self.column_order.iter().position(|&c| c == Some(col_id)) {
230                    if let Some(value) = values.get(idx) {
231                        let pk_val = self.value_to_pk(value, schema.column(col_id));
232                        if pk_val.is_null() {
233                            all_non_null = false;
234                            break;
235                        }
236                        fk_tuple.push(pk_val);
237                    }
238                }
239            }
240
241            if all_non_null && !fk_tuple.is_empty() {
242                fk_values.push((
243                    FkRef {
244                        table_id: schema.id.0,
245                        fk_index: fk_idx as u16,
246                    },
247                    fk_tuple,
248                ));
249            }
250        }
251
252        let pk = if pk_values.is_empty() || pk_values.iter().any(|v| v.is_null()) {
253            None
254        } else {
255            Some(pk_values)
256        };
257
258        (pk, fk_values)
259    }
260
261    /// Convert a parsed value to a PkValue
262    fn value_to_pk(&self, value: &CopyValue, col: Option<&crate::schema::Column>) -> PkValue {
263        match value {
264            CopyValue::Null => PkValue::Null,
265            CopyValue::Integer(n) => PkValue::Int(*n),
266            CopyValue::BigInteger(n) => PkValue::BigInt(*n),
267            CopyValue::Text(bytes) => {
268                let s = String::from_utf8_lossy(bytes);
269
270                // Check if this might be an integer stored as text
271                if let Some(col) = col {
272                    match col.col_type {
273                        ColumnType::Int => {
274                            if let Ok(n) = s.parse::<i64>() {
275                                return PkValue::Int(n);
276                            }
277                        }
278                        ColumnType::BigInt => {
279                            if let Ok(n) = s.parse::<i128>() {
280                                return PkValue::BigInt(n);
281                            }
282                        }
283                        _ => {}
284                    }
285                }
286
287                PkValue::Text(s.into_owned().into_boxed_str())
288            }
289        }
290    }
291}
292
293/// Internal representation of a parsed COPY value
294#[derive(Debug, Clone)]
295enum CopyValue {
296    Null,
297    Integer(i64),
298    BigInteger(i128),
299    Text(Vec<u8>),
300}
301
302/// Parse column list from COPY header
303pub fn parse_copy_columns(header: &str) -> Vec<String> {
304    // COPY table_name (col1, col2, ...) FROM stdin;
305    if let Some(start) = header.find('(') {
306        if let Some(end) = header.find(')') {
307            let cols = &header[start + 1..end];
308            return cols
309                .split(',')
310                .map(|c| c.trim().trim_matches('"').to_string())
311                .collect();
312        }
313    }
314    Vec::new()
315}
316
317/// Parse all rows from a PostgreSQL COPY data block
318pub fn parse_postgres_copy_rows(
319    data: &[u8],
320    schema: &TableSchema,
321    column_order: Vec<String>,
322) -> anyhow::Result<Vec<ParsedCopyRow>> {
323    let mut parser = CopyParser::new(data)
324        .with_schema(schema)
325        .with_column_order(column_order);
326    parser.parse_rows()
327}