Skip to main content

cqlite_core/cql/
mutation_parser.rs

1//! CQL Mutation Statement Parser
2//!
3//! This module provides nom-based parsing for CQL INSERT, UPDATE, and DELETE statements.
4//! It is feature-gated behind the `write-support` feature flag for M5.
5
6use super::ast::*;
7use super::error::ParserError;
8use super::traits::SourcePosition;
9use crate::error::Result;
10use nom::{
11    branch::alt,
12    bytes::complete::{tag_no_case, take_while1, take_while_m_n},
13    character::complete::{char, digit1, multispace0, multispace1},
14    combinator::{map, opt, recognize},
15    multi::{separated_list0, separated_list1},
16    sequence::{preceded, separated_pair, tuple},
17    IResult,
18};
19
20// DoS Protection Constants (Issue #402)
21const MAX_NESTING_DEPTH: usize = 32;
22const MAX_COLLECTION_SIZE: usize = 65536;
23const MAX_INPUT_LENGTH: usize = 16 * 1024 * 1024; // 16 MB
24
25// Identifier Limits (Issue #403)
26const MAX_IDENTIFIER_LENGTH: usize = 48; // Cassandra limit
27
28// Batch Limits
29const MAX_BATCH_STATEMENTS: usize = 65535; // Matches Cassandra's warn threshold
30
31/// ASCII-case-insensitive prefix test. Avoids allocating via `to_lowercase()`
32/// on (potentially multi-megabyte) parser input. `keyword` is expected to be
33/// lowercase ASCII.
34fn starts_with_ascii_ci(input: &str, keyword: &str) -> bool {
35    let bytes = input.as_bytes();
36    let kw = keyword.as_bytes();
37    bytes.len() >= kw.len()
38        && bytes[..kw.len()]
39            .iter()
40            .zip(kw)
41            .all(|(a, b)| a.eq_ignore_ascii_case(b))
42}
43
44/// Enforce the shared [`MAX_INPUT_LENGTH`] DoS guard (Issue #402).
45fn check_input_length(input: &str) -> Result<()> {
46    if input.len() > MAX_INPUT_LENGTH {
47        return Err(ParserError::resource_limit(
48            "input_length",
49            MAX_INPUT_LENGTH as u64,
50            input.len() as u64,
51        )
52        .into());
53    }
54    Ok(())
55}
56
57/// Convert a nom parse result into the crate `Result`, attaching a `kind` label
58/// (e.g. "INSERT", "UPDATE") for the error message.
59fn finish_parse<T>(kind: &str, result: IResult<&str, T>) -> Result<T> {
60    match result {
61        Ok((_, value)) => Ok(value),
62        Err(e) => Err(ParserError::syntax(
63            format!("Failed to parse {} statement: {:?}", kind, e),
64            SourcePosition::start(),
65        )
66        .into()),
67    }
68}
69
70/// Validate identifier (Issue #403)
71fn validate_identifier(name: &str) -> Result<()> {
72    // Reject empty identifiers
73    if name.is_empty() {
74        return Err(
75            ParserError::lexical("Identifier cannot be empty", SourcePosition::start()).into(),
76        );
77    }
78
79    // Check length
80    if name.len() > MAX_IDENTIFIER_LENGTH {
81        return Err(ParserError::resource_limit(
82            "identifier_length",
83            MAX_IDENTIFIER_LENGTH as u64,
84            name.len() as u64,
85        )
86        .into());
87    }
88
89    // Reject control characters (ASCII 0-31, 127)
90    if name.chars().any(|c| c.is_ascii_control()) {
91        return Err(ParserError::lexical(
92            "Identifier contains control characters",
93            SourcePosition::start(),
94        )
95        .into());
96    }
97
98    // Reject null bytes
99    if name.contains('\0') {
100        return Err(ParserError::lexical(
101            "Identifier contains null bytes",
102            SourcePosition::start(),
103        )
104        .into());
105    }
106
107    Ok(())
108}
109
110/// Sanitize identifier for filesystem usage (Issue #403)
111/// Replaces potentially problematic characters with safe alternatives
112#[allow(dead_code)]
113fn sanitize_for_filesystem(identifier: &str) -> String {
114    identifier
115        .chars()
116        .map(|c| match c {
117            '/' | '\\' | ':' | '*' | '?' | '"' | '<' | '>' | '|' => '_',
118            '\0' => '_',
119            c if c.is_ascii_control() => '_',
120            c => c,
121        })
122        .take(MAX_IDENTIFIER_LENGTH)
123        .collect()
124}
125
126/// CQL keyword parser - case insensitive
127fn keyword(s: &str) -> impl Fn(&str) -> IResult<&str, &str> + '_ {
128    move |input| tag_no_case(s)(input)
129}
130
131/// Parse whitespace
132fn ws(input: &str) -> IResult<&str, &str> {
133    multispace0(input)
134}
135
136/// Parse mandatory whitespace
137fn ws1(input: &str) -> IResult<&str, &str> {
138    multispace1(input)
139}
140
141/// Parse quoted identifier with proper escape handling (Issue #403)
142fn parse_quoted_identifier(input: &str) -> IResult<&str, String> {
143    let (input, _) = char('"')(input)?;
144    let mut result = String::new();
145    let mut chars = input.chars();
146    let mut consumed = 0;
147
148    loop {
149        match chars.next() {
150            Some('"') => {
151                // Check for escaped quote ""
152                if chars.clone().next() == Some('"') {
153                    result.push('"');
154                    chars.next();
155                    consumed += 2;
156                } else {
157                    consumed += 1;
158                    break;
159                }
160            }
161            Some(c) => {
162                result.push(c);
163                consumed += c.len_utf8();
164            }
165            None => {
166                return Err(nom::Err::Error(nom::error::Error::new(
167                    input,
168                    nom::error::ErrorKind::Escaped,
169                )))
170            }
171        }
172    }
173
174    Ok((&input[consumed..], result))
175}
176
177/// Parse identifier (table name, column name, etc.)
178fn identifier(input: &str) -> IResult<&str, CqlIdentifier> {
179    // Check if it starts with a quote
180    let is_quoted = input.starts_with('"');
181
182    let (remaining, name_str) = if is_quoted {
183        // Quoted identifier with proper escape handling
184        parse_quoted_identifier(input)?
185    } else {
186        // Unquoted identifier
187        let (rem, n) = take_while1(|c: char| c.is_alphanumeric() || c == '_')(input)?;
188        (rem, n.to_string())
189    };
190
191    // Validate identifier (Issue #403)
192    if let Err(_e) = validate_identifier(&name_str) {
193        return Err(nom::Err::Failure(nom::error::Error::new(
194            input,
195            nom::error::ErrorKind::Verify,
196        )));
197    }
198
199    Ok((
200        remaining,
201        CqlIdentifier {
202            name: name_str,
203            quoted: is_quoted,
204        },
205    ))
206}
207
208/// Parse a qualified table name (keyspace.table or just table)
209fn qualified_table_name(input: &str) -> IResult<&str, CqlTable> {
210    let (input, first) = identifier(input)?;
211    let (input, second) = opt(preceded(char('.'), identifier))(input)?;
212
213    match second {
214        Some(table) => Ok((
215            input,
216            CqlTable {
217                keyspace: Some(first),
218                name: table,
219            },
220        )),
221        None => Ok((
222            input,
223            CqlTable {
224                keyspace: None,
225                name: first,
226            },
227        )),
228    }
229}
230
231/// Parse integer literal
232fn integer_literal(input: &str) -> IResult<&str, i64> {
233    let (rest, num_str) = recognize(tuple((opt(char('-')), digit1)))(input)?;
234    let value = num_str.parse::<i64>().map_err(|_| {
235        nom::Err::Error(nom::error::Error::new(input, nom::error::ErrorKind::Digit))
236    })?;
237    Ok((rest, value))
238}
239
240/// Parse float literal
241fn float_literal(input: &str) -> IResult<&str, f64> {
242    let (rest, num_str) = recognize(tuple((opt(char('-')), digit1, char('.'), digit1)))(input)?;
243    let value = num_str.parse::<f64>().map_err(|_| {
244        nom::Err::Error(nom::error::Error::new(input, nom::error::ErrorKind::Digit))
245    })?;
246    Ok((rest, value))
247}
248
249/// Parse string literal (single-quoted)
250fn string_literal(input: &str) -> IResult<&str, String> {
251    let (input, _) = char('\'')(input)?;
252    let mut result = String::new();
253    let mut chars = input.chars();
254    let mut consumed = 0;
255
256    loop {
257        match chars.next() {
258            Some('\'') => {
259                // Check for escaped quote ''
260                if chars.clone().next() == Some('\'') {
261                    result.push('\'');
262                    chars.next();
263                    consumed += 2;
264                } else {
265                    consumed += 1;
266                    break;
267                }
268            }
269            Some('\\') => {
270                // Handle escape sequences
271                match chars.next() {
272                    Some('n') => result.push('\n'),
273                    Some('r') => result.push('\r'),
274                    Some('t') => result.push('\t'),
275                    Some('\\') => result.push('\\'),
276                    Some('\'') => result.push('\''),
277                    Some(c) => result.push(c),
278                    None => {
279                        return Err(nom::Err::Error(nom::error::Error::new(
280                            input,
281                            nom::error::ErrorKind::Escaped,
282                        )))
283                    }
284                }
285                consumed += 2;
286            }
287            Some(c) => {
288                result.push(c);
289                consumed += c.len_utf8();
290            }
291            None => {
292                return Err(nom::Err::Error(nom::error::Error::new(
293                    input,
294                    nom::error::ErrorKind::Escaped,
295                )))
296            }
297        }
298    }
299
300    Ok((&input[consumed..], result))
301}
302
303/// Parse UUID literal with strict 8-4-4-4-12 hex format validation (Issue #402).
304fn uuid_literal(input: &str) -> IResult<&str, String> {
305    fn hex_run(n: usize) -> impl Fn(&str) -> IResult<&str, &str> {
306        move |i| take_while_m_n(n, n, |c: char| c.is_ascii_hexdigit())(i)
307    }
308    let (rest, matched) = recognize(tuple((
309        hex_run(8),
310        char('-'),
311        hex_run(4),
312        char('-'),
313        hex_run(4),
314        char('-'),
315        hex_run(4),
316        char('-'),
317        hex_run(12),
318    )))(input)?;
319    Ok((rest, matched.to_string()))
320}
321
322/// Parse blob literal (hex string with 0x prefix) with even length validation (Issue #402)
323fn blob_literal(input: &str) -> IResult<&str, String> {
324    let (input, _) = tag_no_case("0x")(input)?;
325    let (input, hex) = take_while1(|c: char| c.is_ascii_hexdigit())(input)?;
326
327    // Validate even hex length (each byte needs 2 hex digits)
328    if hex.len() % 2 != 0 {
329        return Err(nom::Err::Error(nom::error::Error::new(
330            input,
331            nom::error::ErrorKind::Verify,
332        )));
333    }
334
335    Ok((input, hex.to_string()))
336}
337
338/// Emit a `Failure` with [`TooLarge`] — shared by depth and size checks (Issue #402).
339fn too_large<T>(input: &str) -> IResult<&str, T> {
340    Err(nom::Err::Failure(nom::error::Error::new(
341        input,
342        nom::error::ErrorKind::TooLarge,
343    )))
344}
345
346/// Parse the `open..close` body of a collection literal with depth tracking
347/// and a post-parse size check against [`MAX_COLLECTION_SIZE`] (Issue #402).
348fn collection_body<'a, T, P>(
349    input: &'a str,
350    depth: usize,
351    open: char,
352    close: char,
353    mut parse_items: P,
354) -> IResult<&'a str, Vec<T>>
355where
356    P: FnMut(&'a str, usize) -> IResult<&'a str, Vec<T>>,
357{
358    if depth >= MAX_NESTING_DEPTH {
359        return too_large(input);
360    }
361
362    let (input, _) = char(open)(input)?;
363    let (input, _) = ws(input)?;
364    let (input, items) = parse_items(input, depth)?;
365    let (input, _) = ws(input)?;
366    let (input, _) = char(close)(input)?;
367
368    if items.len() > MAX_COLLECTION_SIZE {
369        return too_large(input);
370    }
371
372    Ok((input, items))
373}
374
375/// Parse list literal with depth tracking (Issue #402)
376fn list_literal_depth(input: &str, depth: usize) -> IResult<&str, CqlCollectionLiteral> {
377    let (input, items) = collection_body(input, depth, '[', ']', |i, d| {
378        separated_list0(tuple((ws, char(','), ws)), |i2| literal_depth(i2, d + 1))(i)
379    })?;
380    Ok((input, CqlCollectionLiteral::List(items)))
381}
382
383/// Parse set literal with depth tracking (Issue #402)
384fn set_literal_depth(input: &str, depth: usize) -> IResult<&str, CqlCollectionLiteral> {
385    let (input, items) = collection_body(input, depth, '{', '}', |i, d| {
386        separated_list0(tuple((ws, char(','), ws)), |i2| literal_depth(i2, d + 1))(i)
387    })?;
388    Ok((input, CqlCollectionLiteral::Set(items)))
389}
390
391/// Parse map literal with depth tracking (Issue #402)
392fn map_literal_depth(input: &str, depth: usize) -> IResult<&str, CqlCollectionLiteral> {
393    let (input, pairs) = collection_body(input, depth, '{', '}', |i, d| {
394        separated_list0(
395            tuple((ws, char(','), ws)),
396            separated_pair(
397                |i2| literal_depth(i2, d + 1),
398                tuple((ws, char(':'), ws)),
399                |i2| literal_depth(i2, d + 1),
400            ),
401        )(i)
402    })?;
403    Ok((input, CqlCollectionLiteral::Map(pairs)))
404}
405
406/// Parse CQL literal value with depth tracking (Issue #402)
407fn literal_depth(input: &str, depth: usize) -> IResult<&str, CqlLiteral> {
408    alt((
409        // NULL
410        map(keyword("null"), |_| CqlLiteral::Null),
411        // Boolean
412        map(keyword("true"), |_| CqlLiteral::Boolean(true)),
413        map(keyword("false"), |_| CqlLiteral::Boolean(false)),
414        // String (must come before UUID to avoid UUID being parsed as string)
415        map(string_literal, CqlLiteral::String),
416        // Blob
417        map(blob_literal, CqlLiteral::Blob),
418        // UUID (simple heuristic: contains dashes)
419        map(uuid_literal, CqlLiteral::Uuid),
420        // Float (must come before integer)
421        map(float_literal, CqlLiteral::Float),
422        // Integer
423        map(integer_literal, CqlLiteral::Integer),
424        // List
425        map(|i| list_literal_depth(i, depth), CqlLiteral::Collection),
426        // Set or Map (distinguish by checking for colon)
427        map(|i| set_literal_depth(i, depth), CqlLiteral::Collection),
428        map(|i| map_literal_depth(i, depth), CqlLiteral::Collection),
429    ))(input)
430}
431
432/// Parse CQL literal value (entry point, depth 0)
433fn literal(input: &str) -> IResult<&str, CqlLiteral> {
434    literal_depth(input, 0)
435}
436
437/// Parse expression (simple version for M5)
438fn expression(input: &str) -> IResult<&str, CqlExpression> {
439    alt((
440        // Parameter placeholder
441        map(char('?'), |_| CqlExpression::Parameter(0)),
442        // Named parameter
443        map(preceded(char(':'), identifier), |id| {
444            CqlExpression::NamedParameter(id.name)
445        }),
446        // Literal
447        map(literal, CqlExpression::Literal),
448        // Column reference
449        map(identifier, CqlExpression::Column),
450    ))(input)
451}
452
453/// Parse WHERE clause
454fn where_clause(input: &str) -> IResult<&str, CqlExpression> {
455    let (input, _) = ws(input)?;
456    let (input, _) = keyword("where")(input)?;
457    let (input, _) = ws1(input)?;
458
459    // Parse simple conditions with AND
460    let (input, conditions) =
461        separated_list1(tuple((ws, keyword("and"), ws)), where_condition)(input)?;
462
463    // `separated_list1` guarantees >= 1 element, so `reduce` always returns `Some`.
464    let result = conditions
465        .into_iter()
466        .reduce(|acc, cond| CqlExpression::Binary {
467            left: Box::new(acc),
468            operator: CqlBinaryOperator::And,
469            right: Box::new(cond),
470        })
471        .ok_or_else(|| {
472            nom::Err::Error(nom::error::Error::new(input, nom::error::ErrorKind::Many1))
473        })?;
474
475    Ok((input, result))
476}
477
478/// Parse single WHERE condition
479fn where_condition(input: &str) -> IResult<&str, CqlExpression> {
480    let (input, left) = identifier(input)?;
481    let (input, _) = ws(input)?;
482    let (input, op) = comparison_operator(input)?;
483    let (input, _) = ws(input)?;
484    let (input, right) = expression(input)?;
485
486    Ok((
487        input,
488        CqlExpression::Binary {
489            left: Box::new(CqlExpression::Column(left)),
490            operator: op,
491            right: Box::new(right),
492        },
493    ))
494}
495
496/// Parse comparison operator
497fn comparison_operator(input: &str) -> IResult<&str, CqlBinaryOperator> {
498    alt((
499        map(char('='), |_| CqlBinaryOperator::Eq),
500        map(tag_no_case("!="), |_| CqlBinaryOperator::Ne),
501        map(tag_no_case("<="), |_| CqlBinaryOperator::Le),
502        map(tag_no_case(">="), |_| CqlBinaryOperator::Ge),
503        map(char('<'), |_| CqlBinaryOperator::Lt),
504        map(char('>'), |_| CqlBinaryOperator::Gt),
505    ))(input)
506}
507
508/// Parse USING clause (TTL and TIMESTAMP)
509fn using_clause(input: &str) -> IResult<&str, CqlUsing> {
510    let (input, _) = ws(input)?;
511    let (input, _) = keyword("using")(input)?;
512    let (input, _) = ws1(input)?;
513
514    let (input, first_option) = using_option(input)?;
515    let (input, second_option) =
516        opt(preceded(tuple((ws, keyword("and"), ws)), using_option))(input)?;
517
518    let mut ttl = None;
519    let mut timestamp = None;
520    for opt_val in [Some(first_option), second_option].into_iter().flatten() {
521        match opt_val {
522            UsingOption::Ttl(t) => ttl = Some(t),
523            UsingOption::Timestamp(ts) => timestamp = Some(ts),
524        }
525    }
526
527    Ok((input, CqlUsing { ttl, timestamp }))
528}
529
530/// USING option (TTL or TIMESTAMP)
531enum UsingOption {
532    Ttl(CqlExpression),
533    Timestamp(CqlExpression),
534}
535
536/// Parse single USING option
537fn using_option(input: &str) -> IResult<&str, UsingOption> {
538    alt((
539        map(
540            preceded(tuple((keyword("ttl"), ws)), expression),
541            UsingOption::Ttl,
542        ),
543        map(
544            preceded(tuple((keyword("timestamp"), ws)), expression),
545            UsingOption::Timestamp,
546        ),
547    ))(input)
548}
549
550/// Parse INSERT statement
551pub fn parse_insert_statement(input: &str) -> Result<CqlInsert> {
552    check_input_length(input)?;
553    finish_parse("INSERT", insert_statement_impl(input))
554}
555
556/// Parse the trailing clause shared by both VALUES and JSON INSERT forms:
557/// optional IF NOT EXISTS followed by optional USING clause.
558fn insert_trailer(input: &str) -> IResult<&str, (bool, Option<CqlUsing>)> {
559    let (input, _) = ws(input)?;
560    let (input, if_not_exists) = opt(tuple((
561        keyword("if"),
562        ws1,
563        keyword("not"),
564        ws1,
565        keyword("exists"),
566    )))(input)?;
567    let (input, using) = opt(using_clause)(input)?;
568    let (input, _) = ws(input)?;
569    Ok((input, (if_not_exists.is_some(), using)))
570}
571
572fn insert_statement_impl(input: &str) -> IResult<&str, CqlInsert> {
573    let (input, _) = ws(input)?;
574    let (input, _) = keyword("insert")(input)?;
575    let (input, _) = ws1(input)?;
576    let (input, _) = keyword("into")(input)?;
577    let (input, _) = ws1(input)?;
578
579    // Table name
580    let (input, table) = qualified_table_name(input)?;
581    let (input, _) = ws(input)?;
582
583    // Check for JSON syntax: INSERT INTO table JSON '...'
584    if let Ok((json_input, _)) = keyword("json")(input) {
585        let (json_input, _) = ws1(json_input)?;
586        let (json_input, json_str) = string_literal(json_input)?;
587        let (json_input, (if_not_exists, using)) = insert_trailer(json_input)?;
588
589        return Ok((
590            json_input,
591            CqlInsert {
592                table,
593                columns: vec![],
594                values: CqlInsertValues::Json(json_str),
595                if_not_exists,
596                using,
597            },
598        ));
599    }
600
601    // Column list
602    let (input, _) = char('(')(input)?;
603    let (input, _) = ws(input)?;
604    let (input, columns) = separated_list1(tuple((ws, char(','), ws)), identifier)(input)?;
605    let (input, _) = ws(input)?;
606    let (input, _) = char(')')(input)?;
607    let (input, _) = ws(input)?;
608
609    // VALUES clause
610    let (input, _) = keyword("values")(input)?;
611    let (input, _) = ws(input)?;
612    let (input, _) = char('(')(input)?;
613    let (input, _) = ws(input)?;
614    let (input, values) = separated_list1(tuple((ws, char(','), ws)), expression)(input)?;
615    let (input, _) = ws(input)?;
616    let (input, _) = char(')')(input)?;
617    let (input, (if_not_exists, using)) = insert_trailer(input)?;
618
619    Ok((
620        input,
621        CqlInsert {
622            table,
623            columns,
624            values: CqlInsertValues::Values(values),
625            if_not_exists,
626            using,
627        },
628    ))
629}
630
631/// Parse UPDATE statement
632pub fn parse_update_statement(input: &str) -> Result<CqlUpdate> {
633    check_input_length(input)?;
634    finish_parse("UPDATE", update_statement_impl(input))
635}
636
637fn update_statement_impl(input: &str) -> IResult<&str, CqlUpdate> {
638    let (input, _) = ws(input)?;
639    let (input, _) = keyword("update")(input)?;
640    let (input, _) = ws1(input)?;
641
642    // Table name
643    let (input, table) = qualified_table_name(input)?;
644    let (input, _) = ws(input)?;
645
646    // Optional USING clause (before SET)
647    let (input, using) = opt(using_clause)(input)?;
648    let (input, _) = ws(input)?;
649
650    // SET clause
651    let (input, _) = keyword("set")(input)?;
652    let (input, _) = ws1(input)?;
653    let (input, assignments) = separated_list1(tuple((ws, char(','), ws)), assignment)(input)?;
654    let (input, _) = ws(input)?;
655
656    // WHERE clause
657    let (input, where_expr) = where_clause(input)?;
658    let (input, _) = ws(input)?;
659
660    // Optional IF condition
661    let (input, if_condition) = opt(preceded(tuple((keyword("if"), ws1)), where_condition))(input)?;
662    let (input, _) = ws(input)?;
663
664    Ok((
665        input,
666        CqlUpdate {
667            table,
668            using,
669            assignments,
670            where_clause: where_expr,
671            if_condition,
672        },
673    ))
674}
675
676/// Parse assignment (col = value)
677fn assignment(input: &str) -> IResult<&str, CqlAssignment> {
678    let (input, column) = identifier(input)?;
679    let (input, _) = ws(input)?;
680    let (input, operator) = assignment_operator(input)?;
681    let (input, _) = ws(input)?;
682    let (input, value) = expression(input)?;
683
684    Ok((
685        input,
686        CqlAssignment {
687            column,
688            operator,
689            value,
690        },
691    ))
692}
693
694/// Parse assignment operator
695fn assignment_operator(input: &str) -> IResult<&str, CqlAssignmentOperator> {
696    alt((
697        map(tag_no_case("+="), |_| CqlAssignmentOperator::AddAssign),
698        map(tag_no_case("-="), |_| CqlAssignmentOperator::SubAssign),
699        map(char('='), |_| CqlAssignmentOperator::Assign),
700    ))(input)
701}
702
703/// Parse DELETE statement
704pub fn parse_delete_statement(input: &str) -> Result<CqlDelete> {
705    check_input_length(input)?;
706    finish_parse("DELETE", delete_statement_impl(input))
707}
708
709fn delete_statement_impl(input: &str) -> IResult<&str, CqlDelete> {
710    let (input, _) = ws(input)?;
711    let (input, _) = keyword("delete")(input)?;
712    let (input, _) = ws(input)?;
713
714    // Check if we have column list or FROM directly by peeking at the next keyword.
715    let trimmed = input.trim_start();
716    let has_from = starts_with_ascii_ci(trimmed, "from");
717
718    let (input, columns) = if has_from {
719        (input, vec![])
720    } else {
721        // Parse column list
722        let (input, cols) = separated_list1(tuple((ws, char(','), ws)), identifier)(input)?;
723        let (input, _) = ws(input)?;
724        (input, cols)
725    };
726
727    // FROM clause
728    let (input, _) = keyword("from")(input)?;
729    let (input, _) = ws1(input)?;
730    let (input, table) = qualified_table_name(input)?;
731    let (input, _) = ws(input)?;
732
733    // Optional USING TIMESTAMP
734    let (input, using) = opt(using_clause)(input)?;
735    let (input, _) = ws(input)?;
736
737    // WHERE clause
738    let (input, where_expr) = where_clause(input)?;
739    let (input, _) = ws(input)?;
740
741    // Optional IF condition
742    let (input, if_condition) = opt(preceded(tuple((keyword("if"), ws1)), where_condition))(input)?;
743    let (input, _) = ws(input)?;
744
745    Ok((
746        input,
747        CqlDelete {
748            columns,
749            table,
750            using,
751            where_clause: where_expr,
752            if_condition,
753        },
754    ))
755}
756
757/// Parse BATCH statement.
758///
759/// Accepts multi-table batches syntactically, but the write engine processes each
760/// statement independently against the provided schema. Limited to
761/// [`MAX_BATCH_STATEMENTS`] statements for DoS protection.
762pub fn parse_batch_statement(input: &str) -> Result<CqlBatch> {
763    check_input_length(input)?;
764    finish_parse("BATCH", batch_statement_impl(input))
765}
766
767fn batch_statement_impl(input: &str) -> IResult<&str, CqlBatch> {
768    let (input, _) = ws(input)?;
769    let (input, _) = keyword("begin")(input)?;
770    let (input, _) = ws1(input)?;
771
772    // Optional batch type: UNLOGGED, LOGGED, COUNTER (before BATCH keyword)
773    let (input, batch_type) = batch_type_parser(input)?;
774
775    let (input, _) = keyword("batch")(input)?;
776    let (input, _) = ws(input)?;
777
778    // Optional USING TIMESTAMP
779    let (input, using) = opt(using_clause)(input)?;
780    let (input, _) = ws(input)?;
781
782    // Parse inner statements (separated by semicolons, with optional trailing semicolon)
783    let mut statements = Vec::new();
784    let mut remaining = input;
785    loop {
786        let trimmed = remaining.trim_start();
787        if starts_with_ascii_ci(trimmed, "apply") {
788            remaining = trimmed;
789            break;
790        }
791        if trimmed.is_empty() {
792            break;
793        }
794
795        let (rest, stmt) = if starts_with_ascii_ci(trimmed, "insert") {
796            let (r, ins) = insert_statement_impl(trimmed)?;
797            (r, CqlBatchStatement::Insert(ins))
798        } else if starts_with_ascii_ci(trimmed, "update") {
799            let (r, upd) = update_statement_impl(trimmed)?;
800            (r, CqlBatchStatement::Update(upd))
801        } else if starts_with_ascii_ci(trimmed, "delete") {
802            let (r, del) = delete_statement_impl(trimmed)?;
803            (r, CqlBatchStatement::Delete(del))
804        } else {
805            return Err(nom::Err::Failure(nom::error::Error::new(
806                trimmed,
807                nom::error::ErrorKind::Tag,
808            )));
809        };
810
811        // DoS protection: limit number of statements in a batch
812        if statements.len() >= MAX_BATCH_STATEMENTS {
813            return Err(nom::Err::Failure(nom::error::Error::new(
814                trimmed,
815                nom::error::ErrorKind::TooLarge,
816            )));
817        }
818
819        statements.push(stmt);
820
821        // Consume optional semicolon and whitespace
822        let rest = rest.trim_start();
823        remaining = rest.strip_prefix(';').unwrap_or(rest);
824    }
825
826    // APPLY BATCH
827    let (input, _) = keyword("apply")(remaining)?;
828    let (input, _) = ws1(input)?;
829    let (input, _) = keyword("batch")(input)?;
830    let (input, _) = ws(input)?;
831    // Optional trailing semicolon
832    let input = input.strip_prefix(';').unwrap_or(input);
833    let (input, _) = ws(input)?;
834
835    Ok((
836        input,
837        CqlBatch {
838            batch_type,
839            using,
840            statements,
841        },
842    ))
843}
844
845fn batch_type_parser(input: &str) -> IResult<&str, CqlBatchType> {
846    let trimmed = input.trim_start();
847    for (kw, ty) in [
848        ("unlogged", CqlBatchType::Unlogged),
849        ("counter", CqlBatchType::Counter),
850        ("logged", CqlBatchType::Logged),
851    ] {
852        if starts_with_ascii_ci(trimmed, kw) {
853            let (rest, _) = keyword(kw)(trimmed)?;
854            let (rest, _) = ws1(rest)?;
855            return Ok((rest, ty));
856        }
857    }
858    // Default: logged (no batch-type keyword present; preserve original `input`)
859    Ok((input, CqlBatchType::Logged))
860}
861
862#[cfg(test)]
863mod tests {
864    use super::*;
865
866    #[test]
867    fn test_parse_simple_insert() {
868        let cql = "INSERT INTO users (id, name) VALUES (?, ?)";
869        let result = parse_insert_statement(cql);
870        assert!(result.is_ok());
871
872        let insert = result.unwrap();
873        assert_eq!(insert.table.name.name, "users");
874        assert_eq!(insert.columns.len(), 2);
875        assert_eq!(insert.columns[0].name, "id");
876        assert_eq!(insert.columns[1].name, "name");
877    }
878
879    #[test]
880    fn test_parse_insert_with_literals() {
881        let cql = "INSERT INTO users (id, name, age) VALUES (123, 'John', 30)";
882        let result = parse_insert_statement(cql);
883        assert!(result.is_ok());
884
885        let insert = result.unwrap();
886        assert_eq!(insert.columns.len(), 3);
887        match &insert.values {
888            CqlInsertValues::Values(vals) => {
889                assert_eq!(vals.len(), 3);
890            }
891            _ => panic!("Expected Values variant"),
892        }
893    }
894
895    #[test]
896    fn test_parse_insert_with_ttl() {
897        let cql = "INSERT INTO users (id, name) VALUES (?, ?) USING TTL 3600";
898        let result = parse_insert_statement(cql);
899        assert!(result.is_ok());
900
901        let insert = result.unwrap();
902        assert!(insert.using.is_some());
903        assert!(insert.using.as_ref().unwrap().ttl.is_some());
904    }
905
906    #[test]
907    fn test_parse_insert_with_timestamp() {
908        let cql = "INSERT INTO users (id, name) VALUES (?, ?) USING TIMESTAMP 12345";
909        let result = parse_insert_statement(cql);
910        assert!(result.is_ok());
911
912        let insert = result.unwrap();
913        assert!(insert.using.is_some());
914        assert!(insert.using.as_ref().unwrap().timestamp.is_some());
915    }
916
917    #[test]
918    fn test_parse_insert_if_not_exists() {
919        let cql = "INSERT INTO users (id, name) VALUES (?, ?) IF NOT EXISTS";
920        let result = parse_insert_statement(cql);
921        assert!(result.is_ok());
922
923        let insert = result.unwrap();
924        assert!(insert.if_not_exists);
925    }
926
927    #[test]
928    fn test_parse_simple_update() {
929        let cql = "UPDATE users SET name = ? WHERE id = ?";
930        let result = parse_update_statement(cql);
931        assert!(result.is_ok());
932
933        let update = result.unwrap();
934        assert_eq!(update.table.name.name, "users");
935        assert_eq!(update.assignments.len(), 1);
936        assert_eq!(update.assignments[0].column.name, "name");
937    }
938
939    #[test]
940    fn test_parse_update_with_multiple_assignments() {
941        let cql = "UPDATE users SET name = ?, age = ? WHERE id = ?";
942        let result = parse_update_statement(cql);
943        assert!(result.is_ok());
944
945        let update = result.unwrap();
946        assert_eq!(update.assignments.len(), 2);
947    }
948
949    #[test]
950    fn test_parse_update_with_ttl() {
951        let cql = "UPDATE users USING TTL 3600 SET name = ? WHERE id = ?";
952        let result = parse_update_statement(cql);
953        assert!(result.is_ok());
954
955        let update = result.unwrap();
956        assert!(update.using.is_some());
957        assert!(update.using.as_ref().unwrap().ttl.is_some());
958    }
959
960    #[test]
961    fn test_parse_simple_delete() {
962        let cql = "DELETE FROM users WHERE id = ?";
963        let result = parse_delete_statement(cql);
964        if result.is_err() {
965            eprintln!("Parse error: {:?}", result.as_ref().err());
966        }
967        assert!(result.is_ok());
968
969        let delete = result.unwrap();
970        assert_eq!(delete.table.name.name, "users");
971        assert!(delete.columns.is_empty());
972    }
973
974    #[test]
975    fn test_parse_delete_columns() {
976        let cql = "DELETE name, age FROM users WHERE id = ?";
977        let result = parse_delete_statement(cql);
978        assert!(result.is_ok());
979
980        let delete = result.unwrap();
981        assert_eq!(delete.columns.len(), 2);
982        assert_eq!(delete.columns[0].name, "name");
983        assert_eq!(delete.columns[1].name, "age");
984    }
985
986    #[test]
987    fn test_parse_delete_with_timestamp() {
988        let cql = "DELETE FROM users USING TIMESTAMP 12345 WHERE id = ?";
989        let result = parse_delete_statement(cql);
990        assert!(result.is_ok());
991
992        let delete = result.unwrap();
993        assert!(delete.using.is_some());
994        assert!(delete.using.as_ref().unwrap().timestamp.is_some());
995    }
996
997    #[test]
998    fn test_parse_qualified_table_name() {
999        let cql = "INSERT INTO keyspace.users (id) VALUES (?)";
1000        let result = parse_insert_statement(cql);
1001        assert!(result.is_ok());
1002
1003        let insert = result.unwrap();
1004        assert!(insert.table.keyspace.is_some());
1005        assert_eq!(insert.table.keyspace.as_ref().unwrap().name, "keyspace");
1006        assert_eq!(insert.table.name.name, "users");
1007    }
1008
1009    #[test]
1010    fn test_parse_quoted_identifiers() {
1011        let cql = r#"INSERT INTO "MyTable" ("MyColumn") VALUES (?)"#;
1012        let result = parse_insert_statement(cql);
1013        assert!(result.is_ok());
1014
1015        let insert = result.unwrap();
1016        assert!(insert.table.name.quoted);
1017        assert_eq!(insert.table.name.name, "MyTable");
1018    }
1019
1020    #[test]
1021    fn test_parse_string_literals() {
1022        let cql = "INSERT INTO users (name) VALUES ('John O''Brien')";
1023        let result = parse_insert_statement(cql);
1024        assert!(result.is_ok());
1025    }
1026
1027    #[test]
1028    fn test_parse_null_literal() {
1029        let cql = "INSERT INTO users (name) VALUES (null)";
1030        let result = parse_insert_statement(cql);
1031        assert!(result.is_ok());
1032
1033        let insert = result.unwrap();
1034        match &insert.values {
1035            CqlInsertValues::Values(vals) => {
1036                assert_eq!(vals.len(), 1);
1037                match &vals[0] {
1038                    CqlExpression::Literal(CqlLiteral::Null) => {}
1039                    _ => panic!("Expected NULL literal"),
1040                }
1041            }
1042            _ => panic!("Expected Values variant"),
1043        }
1044    }
1045
1046    #[test]
1047    fn test_parse_collection_literals() {
1048        let cql = "INSERT INTO users (tags) VALUES (['tag1', 'tag2'])";
1049        let result = parse_insert_statement(cql);
1050        assert!(result.is_ok());
1051    }
1052
1053    #[test]
1054    fn test_parse_error_invalid_syntax() {
1055        let cql = "INSERT INVALID SYNTAX";
1056        let result = parse_insert_statement(cql);
1057        assert!(result.is_err());
1058    }
1059
1060    #[test]
1061    fn test_parse_insert_with_both_ttl_and_timestamp() {
1062        let cql = "INSERT INTO users (id, name) VALUES (?, ?) USING TTL 3600 AND TIMESTAMP 12345";
1063        let result = parse_insert_statement(cql);
1064        assert!(result.is_ok());
1065
1066        let insert = result.unwrap();
1067        assert!(insert.using.is_some());
1068        let using = insert.using.as_ref().unwrap();
1069        assert!(using.ttl.is_some());
1070        assert!(using.timestamp.is_some());
1071    }
1072
1073    #[test]
1074    fn test_parse_update_with_compound_where() {
1075        let cql = "UPDATE users SET name = ? WHERE id = ? AND age = ?";
1076        let result = parse_update_statement(cql);
1077        assert!(result.is_ok());
1078
1079        let update = result.unwrap();
1080        assert!(matches!(update.where_clause, CqlExpression::Binary { .. }));
1081    }
1082
1083    #[test]
1084    fn test_parse_delete_with_if_condition() {
1085        let cql = "DELETE FROM users WHERE id = ? IF name = ?";
1086        let result = parse_delete_statement(cql);
1087        assert!(result.is_ok());
1088
1089        let delete = result.unwrap();
1090        assert!(delete.if_condition.is_some());
1091    }
1092
1093    #[test]
1094    fn test_parse_update_with_add_assign() {
1095        let cql = "UPDATE counters SET count += 1 WHERE id = ?";
1096        let result = parse_update_statement(cql);
1097        assert!(result.is_ok());
1098
1099        let update = result.unwrap();
1100        assert_eq!(update.assignments.len(), 1);
1101        assert!(matches!(
1102            update.assignments[0].operator,
1103            CqlAssignmentOperator::AddAssign
1104        ));
1105    }
1106
1107    #[test]
1108    fn test_parse_update_with_sub_assign() {
1109        let cql = "UPDATE counters SET count -= 1 WHERE id = ?";
1110        let result = parse_update_statement(cql);
1111        assert!(result.is_ok());
1112
1113        let update = result.unwrap();
1114        assert_eq!(update.assignments.len(), 1);
1115        assert!(matches!(
1116            update.assignments[0].operator,
1117            CqlAssignmentOperator::SubAssign
1118        ));
1119    }
1120
1121    #[test]
1122    fn test_parse_named_parameters() {
1123        let cql = "INSERT INTO users (id, name) VALUES (:id, :name)";
1124        let result = parse_insert_statement(cql);
1125        assert!(result.is_ok());
1126
1127        let insert = result.unwrap();
1128        match &insert.values {
1129            CqlInsertValues::Values(vals) => {
1130                assert_eq!(vals.len(), 2);
1131                assert!(matches!(vals[0], CqlExpression::NamedParameter(_)));
1132                assert!(matches!(vals[1], CqlExpression::NamedParameter(_)));
1133            }
1134            _ => panic!("Expected Values variant"),
1135        }
1136    }
1137
1138    #[test]
1139    fn test_parse_boolean_literals() {
1140        let cql = "INSERT INTO users (id, active) VALUES (?, true)";
1141        let result = parse_insert_statement(cql);
1142        assert!(result.is_ok());
1143    }
1144
1145    #[test]
1146    fn test_parse_uuid_literal() {
1147        let cql = "INSERT INTO users (id) VALUES (550e8400-e29b-41d4-a716-446655440000)";
1148        let result = parse_insert_statement(cql);
1149        assert!(result.is_ok());
1150
1151        let insert = result.unwrap();
1152        match &insert.values {
1153            CqlInsertValues::Values(vals) => {
1154                assert_eq!(vals.len(), 1);
1155                match &vals[0] {
1156                    CqlExpression::Literal(CqlLiteral::Uuid(_)) => {}
1157                    _ => panic!("Expected UUID literal"),
1158                }
1159            }
1160            _ => panic!("Expected Values variant"),
1161        }
1162    }
1163
1164    #[test]
1165    fn test_parse_blob_literal() {
1166        let cql = "INSERT INTO users (data) VALUES (0xdeadbeef)";
1167        let result = parse_insert_statement(cql);
1168        assert!(result.is_ok());
1169
1170        let insert = result.unwrap();
1171        match &insert.values {
1172            CqlInsertValues::Values(vals) => {
1173                assert_eq!(vals.len(), 1);
1174                match &vals[0] {
1175                    CqlExpression::Literal(CqlLiteral::Blob(hex)) => {
1176                        assert_eq!(hex, "deadbeef");
1177                    }
1178                    _ => panic!("Expected Blob literal"),
1179                }
1180            }
1181            _ => panic!("Expected Values variant"),
1182        }
1183    }
1184
1185    #[test]
1186    fn test_parse_set_literal() {
1187        let cql = "INSERT INTO users (tags) VALUES ({1, 2, 3})";
1188        let result = parse_insert_statement(cql);
1189        assert!(result.is_ok());
1190    }
1191
1192    #[test]
1193    fn test_parse_map_literal() {
1194        let cql = "INSERT INTO users (settings) VALUES ({'key': 'value'})";
1195        let result = parse_insert_statement(cql);
1196        assert!(result.is_ok());
1197    }
1198
1199    #[test]
1200    #[allow(clippy::approx_constant)]
1201    fn test_parse_float_literal() {
1202        let cql = "INSERT INTO metrics (value) VALUES (3.14)";
1203        let result = parse_insert_statement(cql);
1204        assert!(result.is_ok());
1205
1206        let insert = result.unwrap();
1207        match &insert.values {
1208            CqlInsertValues::Values(vals) => {
1209                assert_eq!(vals.len(), 1);
1210                match &vals[0] {
1211                    CqlExpression::Literal(CqlLiteral::Float(f)) => {
1212                        assert!((*f - 3.14).abs() < 0.001, "Expected float close to 3.14");
1213                        // Not approximating PI
1214                    }
1215                    _ => panic!("Expected Float literal"),
1216                }
1217            }
1218            _ => panic!("Expected Values variant"),
1219        }
1220    }
1221
1222    #[test]
1223    fn test_parse_negative_integer() {
1224        let cql = "INSERT INTO metrics (value) VALUES (-42)";
1225        let result = parse_insert_statement(cql);
1226        assert!(result.is_ok());
1227
1228        let insert = result.unwrap();
1229        match &insert.values {
1230            CqlInsertValues::Values(vals) => {
1231                assert_eq!(vals.len(), 1);
1232                match &vals[0] {
1233                    CqlExpression::Literal(CqlLiteral::Integer(i)) => {
1234                        assert_eq!(*i, -42);
1235                    }
1236                    _ => panic!("Expected Integer literal"),
1237                }
1238            }
1239            _ => panic!("Expected Values variant"),
1240        }
1241    }
1242
1243    #[test]
1244    fn test_parse_escaped_string() {
1245        let cql = r#"INSERT INTO users (name) VALUES ('O''Brien')"#;
1246        let result = parse_insert_statement(cql);
1247        assert!(result.is_ok());
1248
1249        let insert = result.unwrap();
1250        match &insert.values {
1251            CqlInsertValues::Values(vals) => {
1252                assert_eq!(vals.len(), 1);
1253                match &vals[0] {
1254                    CqlExpression::Literal(CqlLiteral::String(s)) => {
1255                        assert_eq!(s, "O'Brien");
1256                    }
1257                    _ => panic!("Expected String literal"),
1258                }
1259            }
1260            _ => panic!("Expected Values variant"),
1261        }
1262    }
1263
1264    #[test]
1265    fn test_parse_comparison_operators() {
1266        let operators = vec![
1267            ("id = ?", CqlBinaryOperator::Eq),
1268            ("id != ?", CqlBinaryOperator::Ne),
1269            ("id < ?", CqlBinaryOperator::Lt),
1270            ("id <= ?", CqlBinaryOperator::Le),
1271            ("id > ?", CqlBinaryOperator::Gt),
1272            ("id >= ?", CqlBinaryOperator::Ge),
1273        ];
1274
1275        for (where_expr, expected_op) in operators {
1276            let cql = format!("UPDATE users SET name = ? WHERE {}", where_expr);
1277            let result = parse_update_statement(&cql);
1278            assert!(result.is_ok(), "Failed to parse: {}", cql);
1279
1280            let update = result.unwrap();
1281            match &update.where_clause {
1282                CqlExpression::Binary { operator, .. } => {
1283                    assert_eq!(operator, &expected_op);
1284                }
1285                _ => panic!("Expected Binary expression"),
1286            }
1287        }
1288    }
1289
1290    // Issue #402 - DoS Protection Tests
1291
1292    #[test]
1293    fn test_input_length_limit() {
1294        // Create input exceeding MAX_INPUT_LENGTH (16 MB)
1295        let large_input = format!(
1296            "INSERT INTO users (id, name) VALUES (?, '{}')",
1297            "a".repeat(17 * 1024 * 1024)
1298        );
1299        let result = parse_insert_statement(&large_input);
1300        assert!(result.is_err());
1301        assert!(result.unwrap_err().to_string().contains("input_length"));
1302    }
1303
1304    #[test]
1305    fn test_nesting_depth_limit() {
1306        // Create deeply nested list structure
1307        let mut nested = "1".to_string();
1308        for _ in 0..40 {
1309            nested = format!("[{}]", nested);
1310        }
1311        let cql = format!("INSERT INTO users (data) VALUES ({})", nested);
1312        let result = parse_insert_statement(&cql);
1313        // Should fail due to depth limit
1314        assert!(result.is_err());
1315    }
1316
1317    #[test]
1318    fn test_collection_size_limit() {
1319        // Create a list with MAX_COLLECTION_SIZE + 1 items
1320        let items: Vec<String> = (0..=MAX_COLLECTION_SIZE).map(|i| i.to_string()).collect();
1321        let list = format!("[{}]", items.join(", "));
1322        let cql = format!("INSERT INTO users (data) VALUES ({})", list);
1323        let result = parse_insert_statement(&cql);
1324        // Should fail due to size limit
1325        assert!(result.is_err());
1326    }
1327
1328    #[test]
1329    fn test_valid_nested_collections() {
1330        // Valid nesting within limits (depth 3)
1331        let cql = "INSERT INTO users (data) VALUES ([[1, 2], [3, 4]])";
1332        let result = parse_insert_statement(cql);
1333        assert!(result.is_ok());
1334    }
1335
1336    #[test]
1337    fn test_uuid_strict_format_valid() {
1338        // Valid UUID with proper 8-4-4-4-12 format
1339        let cql = "INSERT INTO users (id) VALUES (550e8400-e29b-41d4-a716-446655440000)";
1340        let result = parse_insert_statement(cql);
1341        assert!(result.is_ok());
1342    }
1343
1344    #[test]
1345    fn test_uuid_invalid_segment_length() {
1346        // Invalid UUID - wrong segment lengths
1347        let cql = "INSERT INTO users (id) VALUES (550e8400-e29b-41d4-a716-4466554400)"; // Last segment too short
1348        let result = parse_insert_statement(cql);
1349        assert!(result.is_err());
1350    }
1351
1352    #[test]
1353    fn test_uuid_missing_dashes() {
1354        // Invalid UUID - missing dashes (should fail as it won't match UUID pattern)
1355        let cql = "INSERT INTO users (id) VALUES (550e8400e29b41d4a716446655440000)";
1356        let result = parse_insert_statement(cql);
1357        // This will fail as a valid parse (might parse as identifier or fail entirely)
1358        assert!(result.is_err());
1359    }
1360
1361    #[test]
1362    fn test_blob_even_hex_length_valid() {
1363        // Valid blob with even hex length
1364        let cql = "INSERT INTO users (data) VALUES (0xdeadbeef)";
1365        let result = parse_insert_statement(cql);
1366        assert!(result.is_ok());
1367    }
1368
1369    #[test]
1370    fn test_blob_odd_hex_length_invalid() {
1371        // Invalid blob with odd hex length
1372        let cql = "INSERT INTO users (data) VALUES (0xabc)";
1373        let result = parse_insert_statement(cql);
1374        assert!(result.is_err());
1375    }
1376
1377    // Issue #403 - Identifier Injection Tests
1378
1379    #[test]
1380    fn test_quoted_identifier_with_escaped_quotes() {
1381        // Quoted identifier with doubled quotes
1382        let cql = r#"INSERT INTO "My""Table" ("My""Column") VALUES (?)"#;
1383        let result = parse_insert_statement(cql);
1384        assert!(result.is_ok());
1385
1386        let insert = result.unwrap();
1387        assert_eq!(insert.table.name.name, r#"My"Table"#);
1388        assert_eq!(insert.columns[0].name, r#"My"Column"#);
1389    }
1390
1391    #[test]
1392    fn test_identifier_max_length() {
1393        // Identifier at max length (48 characters)
1394        let valid_name = "a".repeat(48);
1395        let cql = format!(r#"INSERT INTO "{}" (id) VALUES (?)"#, valid_name);
1396        let result = parse_insert_statement(&cql);
1397        assert!(result.is_ok());
1398    }
1399
1400    #[test]
1401    fn test_identifier_exceeds_max_length() {
1402        // Identifier exceeding max length (49 characters)
1403        let invalid_name = "a".repeat(49);
1404        let cql = format!(r#"INSERT INTO "{}" (id) VALUES (?)"#, invalid_name);
1405        let result = parse_insert_statement(&cql);
1406        assert!(result.is_err());
1407    }
1408
1409    #[test]
1410    fn test_identifier_with_control_characters() {
1411        // Identifier with control character (newline)
1412        let cql = "INSERT INTO \"bad\ntable\" (id) VALUES (?)";
1413        let result = parse_insert_statement(cql);
1414        assert!(result.is_err());
1415    }
1416
1417    #[test]
1418    fn test_identifier_with_null_byte() {
1419        // Identifier with null byte
1420        let cql = "INSERT INTO \"bad\0table\" (id) VALUES (?)";
1421        let result = parse_insert_statement(cql);
1422        assert!(result.is_err());
1423    }
1424
1425    #[test]
1426    fn test_valid_unquoted_identifier() {
1427        // Valid unquoted identifier
1428        let cql = "INSERT INTO my_table_123 (id) VALUES (?)";
1429        let result = parse_insert_statement(cql);
1430        assert!(result.is_ok());
1431    }
1432
1433    #[test]
1434    fn test_sanitize_for_filesystem() {
1435        // Test sanitization function
1436        assert_eq!(sanitize_for_filesystem("normal"), "normal");
1437        assert_eq!(sanitize_for_filesystem("with/slash"), "with_slash");
1438        assert_eq!(sanitize_for_filesystem("with\\backslash"), "with_backslash");
1439        assert_eq!(sanitize_for_filesystem("with:colon"), "with_colon");
1440        assert_eq!(sanitize_for_filesystem("with*asterisk"), "with_asterisk");
1441        assert_eq!(sanitize_for_filesystem("with?question"), "with_question");
1442        assert_eq!(sanitize_for_filesystem("with\"quote"), "with_quote");
1443        assert_eq!(sanitize_for_filesystem("with<less"), "with_less");
1444        assert_eq!(sanitize_for_filesystem("with>greater"), "with_greater");
1445        assert_eq!(sanitize_for_filesystem("with|pipe"), "with_pipe");
1446        assert_eq!(sanitize_for_filesystem("with\0null"), "with_null");
1447
1448        // Test length truncation
1449        let long_name = "a".repeat(100);
1450        let sanitized = sanitize_for_filesystem(&long_name);
1451        assert_eq!(sanitized.len(), MAX_IDENTIFIER_LENGTH);
1452    }
1453
1454    #[test]
1455    fn test_quoted_identifier_empty() {
1456        // Empty quoted identifier should be rejected
1457        let cql = r#"INSERT INTO "" (id) VALUES (?)"#;
1458        let result = parse_insert_statement(cql);
1459        // nom's take_while1 will fail on empty identifier
1460        assert!(result.is_err());
1461    }
1462
1463    #[test]
1464    fn test_multiple_escaped_quotes() {
1465        // Multiple consecutive escaped quotes
1466        let cql = r#"INSERT INTO "Tab""""le" (id) VALUES (?)"#;
1467        let result = parse_insert_statement(cql);
1468        assert!(result.is_ok());
1469
1470        let insert = result.unwrap();
1471        assert_eq!(insert.table.name.name, r#"Tab""le"#);
1472    }
1473
1474    #[test]
1475    fn test_collection_size_within_limit() {
1476        // Collection just within the limit
1477        let items: Vec<String> = (0..1000).map(|i| i.to_string()).collect();
1478        let list = format!("[{}]", items.join(", "));
1479        let cql = format!("INSERT INTO users (data) VALUES ({})", list);
1480        let result = parse_insert_statement(&cql);
1481        assert!(result.is_ok());
1482    }
1483
1484    #[test]
1485    fn test_map_size_limit() {
1486        // Map with too many entries
1487        let pairs: Vec<String> = (0..=MAX_COLLECTION_SIZE)
1488            .map(|i| format!("{}: {}", i, i))
1489            .collect();
1490        let map = format!("{{{}}}", pairs.join(", "));
1491        let cql = format!("INSERT INTO users (data) VALUES ({})", map);
1492        let result = parse_insert_statement(&cql);
1493        // Should fail due to size limit
1494        assert!(result.is_err());
1495    }
1496
1497    #[test]
1498    fn test_nested_map_depth_limit() {
1499        // Create nested map structure exceeding depth limit
1500        let mut nested = "{'k': 1}".to_string();
1501        for i in 0..40 {
1502            nested = format!("{{'key{}': {}}}", i, nested);
1503        }
1504        let cql = format!("INSERT INTO users (data) VALUES ({})", nested);
1505        let result = parse_insert_statement(&cql);
1506        // Should fail due to depth limit
1507        assert!(result.is_err());
1508    }
1509
1510    // Unicode handling in string and identifier parsing
1511
1512    #[test]
1513    fn test_string_literal_with_multibyte_utf8() {
1514        let cql = "INSERT INTO ks.t (name) VALUES ('héllo wörld')";
1515        let result = parse_insert_statement(cql);
1516        assert!(
1517            result.is_ok(),
1518            "Failed to parse string with accented chars: {:?}",
1519            result.err()
1520        );
1521    }
1522
1523    #[test]
1524    fn test_string_literal_with_emoji() {
1525        let cql = "INSERT INTO ks.t (name) VALUES ('hello 🌍 world')";
1526        let result = parse_insert_statement(cql);
1527        assert!(
1528            result.is_ok(),
1529            "Failed to parse string with emoji: {:?}",
1530            result.err()
1531        );
1532    }
1533
1534    #[test]
1535    fn test_string_literal_with_cjk() {
1536        let cql = "INSERT INTO ks.t (name) VALUES ('你好世界')";
1537        let result = parse_insert_statement(cql);
1538        assert!(
1539            result.is_ok(),
1540            "Failed to parse string with CJK chars: {:?}",
1541            result.err()
1542        );
1543    }
1544
1545    #[test]
1546    fn test_quoted_identifier_with_multibyte_utf8() {
1547        let cql = "INSERT INTO ks.t (\"nàme\") VALUES ('test')";
1548        let result = parse_insert_statement(cql);
1549        assert!(
1550            result.is_ok(),
1551            "Failed to parse quoted identifier with accented chars: {:?}",
1552            result.err()
1553        );
1554    }
1555
1556    #[test]
1557    fn test_quoted_identifier_with_emoji() {
1558        let cql = "INSERT INTO ks.t (\"col_🎉\") VALUES ('test')";
1559        let result = parse_insert_statement(cql);
1560        assert!(
1561            result.is_ok(),
1562            "Failed to parse quoted identifier with emoji: {:?}",
1563            result.err()
1564        );
1565    }
1566
1567    // INSERT INTO ... JSON parsing
1568
1569    #[test]
1570    fn test_insert_json_basic() {
1571        let cql = r#"INSERT INTO ks.t JSON '{"id": 1, "name": "test"}'"#;
1572        let result = parse_insert_statement(cql).unwrap();
1573        assert_eq!(result.table.keyspace.unwrap().name, "ks");
1574        assert_eq!(result.table.name.name, "t");
1575        assert!(result.columns.is_empty());
1576        match &result.values {
1577            CqlInsertValues::Json(s) => {
1578                assert_eq!(s, r#"{"id": 1, "name": "test"}"#);
1579            }
1580            CqlInsertValues::Values(_) => panic!("Expected Json variant"),
1581        }
1582    }
1583
1584    #[test]
1585    fn test_insert_json_if_not_exists() {
1586        let cql = r#"INSERT INTO ks.t JSON '{"id": 1}' IF NOT EXISTS"#;
1587        let result = parse_insert_statement(cql).unwrap();
1588        assert!(result.if_not_exists);
1589        assert!(matches!(&result.values, CqlInsertValues::Json(_)));
1590    }
1591
1592    #[test]
1593    fn test_insert_json_with_using_timestamp() {
1594        let cql = r#"INSERT INTO ks.t JSON '{"id": 1}' USING TIMESTAMP 12345"#;
1595        let result = parse_insert_statement(cql).unwrap();
1596        assert!(matches!(&result.values, CqlInsertValues::Json(_)));
1597        assert!(result.using.is_some());
1598    }
1599
1600    #[test]
1601    fn test_insert_values_still_works() {
1602        // Ensure the JSON branch doesn't break normal VALUES parsing
1603        let cql = "INSERT INTO ks.t (id, name) VALUES (1, 'test')";
1604        let result = parse_insert_statement(cql).unwrap();
1605        assert!(matches!(&result.values, CqlInsertValues::Values(_)));
1606        assert_eq!(result.columns.len(), 2);
1607    }
1608}