Skip to main content

s4_server/
select.rs

1//! S3 Select — server-side SQL filter on object body (v0.6 #41).
2//!
3//! Implements the [`SelectObjectContent`][aws-doc] surface as a small,
4//! self-contained module. The primary entry point is [`run_select_csv`] /
5//! [`run_select_jsonlines`] which take a SQL string and the in-memory body
6//! bytes (the caller is responsible for fetching + decompressing +
7//! decrypting the object — at the handler level we delegate to S4's
8//! existing GET path so SSE-C / SSE-S4 / SSE-KMS / S4 codec all work
9//! transparently).
10//!
11//! ## Supported SQL subset
12//!
13//! - `SELECT col1, col2 FROM s3object` — projection by header name when
14//!   the CSV has a header line.
15//! - `SELECT _1, _3 FROM s3object` — positional projection (1-based, AWS
16//!   convention; `_1` is the leftmost column).
17//! - `SELECT * FROM s3object` — all columns in input order.
18//! - `WHERE col = 'value'`, `WHERE col > 100`, `WHERE col LIKE 'foo%'`.
19//! - `AND` / `OR` / `NOT` boolean composition.
20//! - String / integer / float literals.
21//! - Equality / inequality (`=`, `<>`, `<`, `>`, `<=`, `>=`) and `LIKE`.
22//!
23//! ## Explicitly unsupported (rejected with [`SelectError::UnsupportedFeature`])
24//!
25//! - Aggregates (`COUNT`, `SUM`, `AVG`, …) and `GROUP BY` / `HAVING`.
26//! - `JOIN` / subqueries.
27//! - `ORDER BY` / `LIMIT` (Select-on-S3 streams in input order; aggregating
28//!   would defeat the streaming model and is outside this v0.6 scope).
29//! - Parquet input (Parquet decode is intentionally out of scope; CSV /
30//!   JSON Lines are the v0.6 deliverables).
31//!
32//! ## Output framing
33//!
34//! [`EventStreamWriter`] emits the AWS event-stream binary protocol —
35//! one `Records` frame per non-empty payload, an optional `Stats` frame,
36//! and a terminating `End` frame. Each frame is
37//! `[total_len BE u32][headers_len BE u32][prelude CRC32][headers][payload][message CRC32]`
38//! per the [AWS appendix][aws-events]. The handler in `service.rs` feeds
39//! the produced events into `s3s::dto::SelectObjectContentEventStream`,
40//! which performs equivalent framing on the wire — `EventStreamWriter`
41//! exists primarily so the **frame format itself** can be unit-tested and
42//! asserted-on by the integration test without spinning up a full client.
43//!
44//! [aws-doc]: https://docs.aws.amazon.com/AmazonS3/latest/API/API_SelectObjectContent.html
45//! [aws-events]: https://docs.aws.amazon.com/AmazonS3/latest/API/RESTSelectObjectAppendix.html
46
47use sqlparser::ast::{
48    BinaryOperator, Expr, GroupByExpr, ObjectName, Query, Select, SelectItem, SetExpr,
49    Statement, TableFactor, UnaryOperator, Value,
50};
51use sqlparser::dialect::GenericDialect;
52use sqlparser::parser::Parser;
53
54// =====================================================================
55// Errors
56// =====================================================================
57
58#[derive(Debug, thiserror::Error)]
59pub enum SelectError {
60    #[error("SQL parse error: {0}")]
61    Parse(String),
62    #[error("unsupported SQL feature: {0}")]
63    UnsupportedFeature(String),
64    #[error("input format error: {0}")]
65    InputFormat(String),
66    #[error("row evaluation error: {0}")]
67    RowEval(String),
68}
69
70// =====================================================================
71// Input / output formats
72// =====================================================================
73
74#[derive(Debug, Clone)]
75pub enum SelectInputFormat {
76    Csv { has_header: bool, delimiter: char },
77    JsonLines,
78}
79
80#[derive(Debug, Clone)]
81pub enum SelectOutputFormat {
82    Csv,
83    Json,
84}
85
86// =====================================================================
87// Parsed query
88// =====================================================================
89
90#[derive(Debug, Clone)]
91pub struct SelectQuery {
92    /// Raw sqlparser SELECT items, validated against the supported
93    /// subset at parse time (no aggregates / window funcs / subqueries).
94    pub projection: Vec<SelectItem>,
95    pub where_clause: Option<Expr>,
96    /// Typically the literal `s3object` (case-insensitive). Captured for
97    /// completeness; the runtime ignores it because there's only ever one
98    /// virtual table in a Select query.
99    pub from_alias: String,
100}
101
102/// Parse and validate a S3 Select SQL expression.
103///
104/// Reject features that have no row-streaming semantics on a single
105/// object: aggregates, GROUP BY, HAVING, JOIN, ORDER BY, LIMIT, DISTINCT.
106pub fn parse_select(sql: &str) -> Result<SelectQuery, SelectError> {
107    let dialect = GenericDialect {};
108    let mut statements = Parser::parse_sql(&dialect, sql)
109        .map_err(|e| SelectError::Parse(e.to_string()))?;
110    if statements.len() != 1 {
111        return Err(SelectError::Parse(format!(
112            "expected exactly one statement, got {}",
113            statements.len()
114        )));
115    }
116    let stmt = statements.pop().expect("len == 1");
117    let query = match stmt {
118        Statement::Query(q) => *q,
119        other => {
120            return Err(SelectError::UnsupportedFeature(format!(
121                "only SELECT statements are supported, got: {other:?}"
122            )));
123        }
124    };
125    let Query {
126        body, order_by, limit, offset, fetch, locks, with, ..
127    } = query;
128    if with.is_some() {
129        return Err(SelectError::UnsupportedFeature("CTE / WITH".into()));
130    }
131    if order_by.is_some() {
132        return Err(SelectError::UnsupportedFeature("ORDER BY".into()));
133    }
134    if limit.is_some() {
135        return Err(SelectError::UnsupportedFeature("LIMIT".into()));
136    }
137    if offset.is_some() {
138        return Err(SelectError::UnsupportedFeature("OFFSET".into()));
139    }
140    if fetch.is_some() {
141        return Err(SelectError::UnsupportedFeature("FETCH".into()));
142    }
143    if !locks.is_empty() {
144        return Err(SelectError::UnsupportedFeature("FOR UPDATE / lock clauses".into()));
145    }
146
147    let select = match *body {
148        SetExpr::Select(s) => *s,
149        SetExpr::Query(_) => {
150            return Err(SelectError::UnsupportedFeature("nested query".into()));
151        }
152        SetExpr::SetOperation { .. } => {
153            return Err(SelectError::UnsupportedFeature("set operation (UNION/INTERSECT/EXCEPT)".into()));
154        }
155        other => {
156            return Err(SelectError::UnsupportedFeature(format!("unsupported SetExpr: {other:?}")));
157        }
158    };
159
160    let Select {
161        distinct,
162        top,
163        projection,
164        from,
165        selection,
166        group_by,
167        having,
168        named_window,
169        qualify,
170        cluster_by,
171        distribute_by,
172        sort_by,
173        prewhere,
174        connect_by,
175        ..
176    } = select;
177    if distinct.is_some() {
178        return Err(SelectError::UnsupportedFeature("DISTINCT".into()));
179    }
180    if top.is_some() {
181        return Err(SelectError::UnsupportedFeature("TOP".into()));
182    }
183    if having.is_some() {
184        return Err(SelectError::UnsupportedFeature("HAVING".into()));
185    }
186    if !named_window.is_empty() {
187        return Err(SelectError::UnsupportedFeature("WINDOW".into()));
188    }
189    if qualify.is_some() {
190        return Err(SelectError::UnsupportedFeature("QUALIFY".into()));
191    }
192    if !cluster_by.is_empty() || !distribute_by.is_empty() || !sort_by.is_empty() {
193        return Err(SelectError::UnsupportedFeature(
194            "CLUSTER BY / DISTRIBUTE BY / SORT BY".into(),
195        ));
196    }
197    if prewhere.is_some() {
198        return Err(SelectError::UnsupportedFeature("PREWHERE".into()));
199    }
200    if connect_by.is_some() {
201        return Err(SelectError::UnsupportedFeature("CONNECT BY".into()));
202    }
203    match group_by {
204        GroupByExpr::Expressions(ref exprs, ref mods) if exprs.is_empty() && mods.is_empty() => {}
205        _ => return Err(SelectError::UnsupportedFeature("GROUP BY".into())),
206    }
207
208    // Validate projection — reject anything that requires a non-row-local
209    // computation (function calls, subqueries, aggregates).
210    for item in &projection {
211        validate_projection_item(item)?;
212    }
213    if let Some(ref where_expr) = selection {
214        validate_where_expr(where_expr)?;
215    }
216
217    // FROM must be a single table reference, optionally aliased.
218    let from_alias = match from.as_slice() {
219        [twj] if twj.joins.is_empty() => match &twj.relation {
220            TableFactor::Table { name, alias, .. } => alias
221                .as_ref()
222                .map(|a| a.name.value.clone())
223                .unwrap_or_else(|| object_name_to_string(name)),
224            _ => {
225                return Err(SelectError::UnsupportedFeature(
226                    "only `FROM s3object` (or aliased single table) is supported".into(),
227                ));
228            }
229        },
230        [] => "s3object".to_owned(),
231        _ => return Err(SelectError::UnsupportedFeature("JOIN / multiple FROM tables".into())),
232    };
233
234    Ok(SelectQuery {
235        projection,
236        where_clause: selection,
237        from_alias,
238    })
239}
240
241fn object_name_to_string(name: &ObjectName) -> String {
242    name.0
243        .iter()
244        .map(|i| i.value.as_str())
245        .collect::<Vec<_>>()
246        .join(".")
247}
248
249fn validate_projection_item(item: &SelectItem) -> Result<(), SelectError> {
250    match item {
251        SelectItem::Wildcard(_) => Ok(()),
252        SelectItem::QualifiedWildcard(_, _) => Ok(()),
253        SelectItem::UnnamedExpr(e) | SelectItem::ExprWithAlias { expr: e, .. } => {
254            validate_simple_column_expr(e)
255        }
256    }
257}
258
259fn validate_simple_column_expr(expr: &Expr) -> Result<(), SelectError> {
260    match expr {
261        Expr::Identifier(_) | Expr::CompoundIdentifier(_) => Ok(()),
262        Expr::Function(_) => Err(SelectError::UnsupportedFeature(
263            "aggregate / scalar function in projection (only bare column references supported)".into(),
264        )),
265        Expr::Subquery(_) | Expr::Exists { .. } => {
266            Err(SelectError::UnsupportedFeature("subquery in projection".into()))
267        }
268        _ => Err(SelectError::UnsupportedFeature(format!(
269            "unsupported projection expression: {expr}"
270        ))),
271    }
272}
273
274fn validate_where_expr(expr: &Expr) -> Result<(), SelectError> {
275    match expr {
276        Expr::Identifier(_) | Expr::CompoundIdentifier(_) | Expr::Value(_) => Ok(()),
277        Expr::Nested(inner) => validate_where_expr(inner),
278        Expr::UnaryOp { op, expr } => match op {
279            UnaryOperator::Not | UnaryOperator::Minus | UnaryOperator::Plus => {
280                validate_where_expr(expr)
281            }
282            other => Err(SelectError::UnsupportedFeature(format!(
283                "unsupported unary operator in WHERE: {other:?}"
284            ))),
285        },
286        Expr::BinaryOp { op, left, right } => match op {
287            BinaryOperator::Eq
288            | BinaryOperator::NotEq
289            | BinaryOperator::Lt
290            | BinaryOperator::LtEq
291            | BinaryOperator::Gt
292            | BinaryOperator::GtEq
293            | BinaryOperator::And
294            | BinaryOperator::Or => {
295                validate_where_expr(left)?;
296                validate_where_expr(right)
297            }
298            other => Err(SelectError::UnsupportedFeature(format!(
299                "unsupported binary operator in WHERE: {other:?}"
300            ))),
301        },
302        Expr::Like { expr, pattern, .. } => {
303            validate_where_expr(expr)?;
304            validate_where_expr(pattern)
305        }
306        Expr::IsNull(e) | Expr::IsNotNull(e) => validate_where_expr(e),
307        Expr::Function(_) => Err(SelectError::UnsupportedFeature(
308            "function call in WHERE".into(),
309        )),
310        Expr::Subquery(_) | Expr::Exists { .. } | Expr::InSubquery { .. } => {
311            Err(SelectError::UnsupportedFeature("subquery in WHERE".into()))
312        }
313        other => Err(SelectError::UnsupportedFeature(format!(
314            "unsupported WHERE expression: {other}"
315        ))),
316    }
317}
318
319// =====================================================================
320// Row representation + lookup
321// =====================================================================
322
323/// CSV input row. Columns indexed by 0-based position OR by header name
324/// (when the InputFormat says `has_header = true`).
325pub struct CsvRow<'a> {
326    pub fields: Vec<&'a str>,
327    pub headers: Option<&'a [String]>,
328}
329
330impl CsvRow<'_> {
331    /// Look up a column. AWS Select supports both bare `column_name` (when
332    /// the CSV has a header) and `_1`, `_2`, ... positional refs. Returns
333    /// `None` if the identifier doesn't resolve.
334    #[must_use]
335    pub fn get(&self, ident: &str) -> Option<&str> {
336        if let Some(stripped) = ident.strip_prefix('_')
337            && let Ok(n) = stripped.parse::<usize>()
338            && n >= 1
339        {
340            return self.fields.get(n - 1).copied();
341        }
342        // Header-name lookup. AWS S3 Select treats column names
343        // case-insensitively when matched against headers in the file.
344        if let Some(headers) = self.headers {
345            for (i, h) in headers.iter().enumerate() {
346                if h.eq_ignore_ascii_case(ident) {
347                    return self.fields.get(i).copied();
348                }
349            }
350        }
351        None
352    }
353}
354
355// =====================================================================
356// Row evaluation
357// =====================================================================
358
359/// Logical value used by the WHERE evaluator. We keep it intentionally
360/// small — only the literal kinds the supported subset can produce.
361#[derive(Debug, Clone)]
362enum Lit<'a> {
363    Null,
364    Bool(bool),
365    Int(i64),
366    Float(f64),
367    Str(std::borrow::Cow<'a, str>),
368}
369
370impl<'a> Lit<'a> {
371    fn from_str_value(s: &'a str) -> Lit<'a> {
372        Lit::Str(std::borrow::Cow::Borrowed(s))
373    }
374
375    fn truthy(&self) -> bool {
376        matches!(self, Lit::Bool(true))
377    }
378}
379
380/// Apply WHERE + projection to a single row. Returns `Ok(Some(values))`
381/// for matched rows (one `String` per `SELECT` item, in declaration
382/// order), `Ok(None)` if WHERE excluded the row, `Err(...)` only on
383/// runtime evaluation problems (a projected column not in the row, etc).
384pub fn evaluate_row(
385    query: &SelectQuery,
386    row: &CsvRow<'_>,
387) -> Result<Option<Vec<String>>, SelectError> {
388    if let Some(ref w) = query.where_clause {
389        let v = eval_expr(w, row)?;
390        if !v.truthy() {
391            return Ok(None);
392        }
393    }
394    let mut out = Vec::with_capacity(query.projection.len());
395    for item in &query.projection {
396        match item {
397            SelectItem::Wildcard(_) | SelectItem::QualifiedWildcard(_, _) => {
398                for f in &row.fields {
399                    out.push((*f).to_owned());
400                }
401            }
402            SelectItem::UnnamedExpr(e) | SelectItem::ExprWithAlias { expr: e, .. } => {
403                let ident = expr_as_column(e)?;
404                let v = row.get(&ident).ok_or_else(|| {
405                    SelectError::RowEval(format!("column not found: {ident}"))
406                })?;
407                out.push(v.to_owned());
408            }
409        }
410    }
411    Ok(Some(out))
412}
413
414fn expr_as_column(expr: &Expr) -> Result<String, SelectError> {
415    match expr {
416        Expr::Identifier(i) => Ok(i.value.clone()),
417        Expr::CompoundIdentifier(parts) => parts
418            .last()
419            .map(|p| p.value.clone())
420            .ok_or_else(|| SelectError::RowEval("empty compound identifier".into())),
421        other => Err(SelectError::UnsupportedFeature(format!(
422            "non-column projection: {other}"
423        ))),
424    }
425}
426
427fn eval_expr<'a>(expr: &Expr, row: &'a CsvRow<'a>) -> Result<Lit<'a>, SelectError> {
428    match expr {
429        Expr::Nested(inner) => eval_expr(inner, row),
430        Expr::Identifier(i) => Ok(row
431            .get(&i.value)
432            .map_or(Lit::Null, Lit::from_str_value)),
433        Expr::CompoundIdentifier(parts) => {
434            let last = parts
435                .last()
436                .ok_or_else(|| SelectError::RowEval("empty compound identifier".into()))?;
437            Ok(row
438                .get(&last.value)
439                .map_or(Lit::Null, Lit::from_str_value))
440        }
441        Expr::Value(v) => value_to_lit(v),
442        Expr::UnaryOp { op, expr } => {
443            let v = eval_expr(expr, row)?;
444            match op {
445                UnaryOperator::Not => Ok(Lit::Bool(!v.truthy())),
446                UnaryOperator::Minus => match v {
447                    Lit::Int(n) => Ok(Lit::Int(-n)),
448                    Lit::Float(f) => Ok(Lit::Float(-f)),
449                    other => Err(SelectError::RowEval(format!(
450                        "cannot negate non-numeric value: {other:?}"
451                    ))),
452                },
453                UnaryOperator::Plus => Ok(v),
454                other => Err(SelectError::UnsupportedFeature(format!(
455                    "unsupported unary op: {other:?}"
456                ))),
457            }
458        }
459        Expr::BinaryOp { op, left, right } => {
460            let l = eval_expr(left, row)?;
461            let r = eval_expr(right, row)?;
462            eval_binary(op, &l, &r)
463        }
464        Expr::Like { negated, expr, pattern, escape_char } => {
465            if escape_char.is_some() {
466                return Err(SelectError::UnsupportedFeature(
467                    "LIKE ESCAPE clause".into(),
468                ));
469            }
470            let s_val = eval_expr(expr, row)?;
471            let p_val = eval_expr(pattern, row)?;
472            let s = lit_as_str(&s_val);
473            let p = lit_as_str(&p_val);
474            let m = like_match(s.as_ref(), p.as_ref());
475            Ok(Lit::Bool(if *negated { !m } else { m }))
476        }
477        Expr::IsNull(e) => Ok(Lit::Bool(matches!(eval_expr(e, row)?, Lit::Null))),
478        Expr::IsNotNull(e) => Ok(Lit::Bool(!matches!(eval_expr(e, row)?, Lit::Null))),
479        other => Err(SelectError::UnsupportedFeature(format!(
480            "unsupported expression in WHERE: {other}"
481        ))),
482    }
483}
484
485fn value_to_lit<'a>(v: &Value) -> Result<Lit<'a>, SelectError> {
486    match v {
487        Value::Number(s, _) => {
488            if let Ok(n) = s.parse::<i64>() {
489                Ok(Lit::Int(n))
490            } else if let Ok(f) = s.parse::<f64>() {
491                Ok(Lit::Float(f))
492            } else {
493                Err(SelectError::RowEval(format!("invalid number literal: {s}")))
494            }
495        }
496        Value::SingleQuotedString(s) | Value::DoubleQuotedString(s) => {
497            Ok(Lit::Str(std::borrow::Cow::Owned(s.clone())))
498        }
499        Value::Boolean(b) => Ok(Lit::Bool(*b)),
500        Value::Null => Ok(Lit::Null),
501        other => Err(SelectError::UnsupportedFeature(format!(
502            "literal kind not supported: {other:?}"
503        ))),
504    }
505}
506
507fn lit_as_str<'a>(v: &Lit<'a>) -> std::borrow::Cow<'a, str> {
508    match v {
509        Lit::Null => std::borrow::Cow::Borrowed(""),
510        Lit::Bool(b) => std::borrow::Cow::Owned(if *b { "true" } else { "false" }.into()),
511        Lit::Int(n) => std::borrow::Cow::Owned(n.to_string()),
512        Lit::Float(f) => std::borrow::Cow::Owned(f.to_string()),
513        Lit::Str(s) => s.clone(),
514    }
515}
516
517fn lit_as_f64(v: &Lit<'_>) -> Option<f64> {
518    match v {
519        Lit::Int(n) => Some(*n as f64),
520        Lit::Float(f) => Some(*f),
521        Lit::Str(s) => s.parse::<f64>().ok(),
522        Lit::Bool(_) | Lit::Null => None,
523    }
524}
525
526fn eval_binary<'a>(
527    op: &BinaryOperator,
528    l: &Lit<'a>,
529    r: &Lit<'a>,
530) -> Result<Lit<'a>, SelectError> {
531    use BinaryOperator::*;
532    match op {
533        And => Ok(Lit::Bool(l.truthy() && r.truthy())),
534        Or => Ok(Lit::Bool(l.truthy() || r.truthy())),
535        Eq | NotEq | Lt | LtEq | Gt | GtEq => {
536            // NULLs propagate to NULL → not-truthy. AWS S3 Select uses the
537            // SQL NULL semantics; we collapse to a Bool(false) so they
538            // simply don't match.
539            if matches!(l, Lit::Null) || matches!(r, Lit::Null) {
540                return Ok(Lit::Bool(false));
541            }
542            // Try numeric comparison first when both sides parse as
543            // numbers — covers `col > 100` against CSV string fields.
544            let cmp = if let (Some(a), Some(b)) = (lit_as_f64(l), lit_as_f64(r)) {
545                a.partial_cmp(&b)
546            } else {
547                let a = lit_as_str(l);
548                let b = lit_as_str(r);
549                Some(a.as_ref().cmp(b.as_ref()))
550            };
551            let ord =
552                cmp.ok_or_else(|| SelectError::RowEval("incomparable values (NaN?)".into()))?;
553            let res = match op {
554                Eq => ord == std::cmp::Ordering::Equal,
555                NotEq => ord != std::cmp::Ordering::Equal,
556                Lt => ord == std::cmp::Ordering::Less,
557                LtEq => ord != std::cmp::Ordering::Greater,
558                Gt => ord == std::cmp::Ordering::Greater,
559                GtEq => ord != std::cmp::Ordering::Less,
560                _ => unreachable!("guarded by outer match"),
561            };
562            Ok(Lit::Bool(res))
563        }
564        other => Err(SelectError::UnsupportedFeature(format!(
565            "unsupported binary operator: {other:?}"
566        ))),
567    }
568}
569
570/// SQL `LIKE` matcher. Supports `%` (any sequence) and `_` (any single
571/// char). Anchored at both ends — `'foo%'` matches `"foobar"` but not
572/// `"xfoobar"`.
573fn like_match(s: &str, pattern: &str) -> bool {
574    let s_bytes: Vec<char> = s.chars().collect();
575    let p_bytes: Vec<char> = pattern.chars().collect();
576    let (mut si, mut pi) = (0usize, 0usize);
577    let (mut star, mut match_si) = (None::<usize>, 0usize);
578    while si < s_bytes.len() {
579        if pi < p_bytes.len() && (p_bytes[pi] == '_' || p_bytes[pi] == s_bytes[si]) {
580            si += 1;
581            pi += 1;
582        } else if pi < p_bytes.len() && p_bytes[pi] == '%' {
583            star = Some(pi);
584            match_si = si;
585            pi += 1;
586        } else if let Some(sp) = star {
587            pi = sp + 1;
588            match_si += 1;
589            si = match_si;
590        } else {
591            return false;
592        }
593    }
594    while pi < p_bytes.len() && p_bytes[pi] == '%' {
595        pi += 1;
596    }
597    pi == p_bytes.len()
598}
599
600// =====================================================================
601// CSV / JSON Lines runners
602// =====================================================================
603
604/// Run a Select against a CSV-bytes body in-memory. Returns the
605/// concatenated output bytes in `output` format (CSV: rfc4180 single CRLF
606/// rows / JSON: one JSON-object-per-line).
607pub fn run_select_csv(
608    sql: &str,
609    body: &[u8],
610    input: SelectInputFormat,
611    output: SelectOutputFormat,
612) -> Result<Vec<u8>, SelectError> {
613    let (has_header, delim) = match input {
614        SelectInputFormat::Csv { has_header, delimiter } => (has_header, delimiter),
615        SelectInputFormat::JsonLines => {
616            return Err(SelectError::InputFormat(
617                "run_select_csv called with JsonLines input — use run_select_jsonlines".into(),
618            ));
619        }
620    };
621    let query = parse_select(sql)?;
622
623    let mut rdr = csv::ReaderBuilder::new()
624        .has_headers(has_header)
625        .delimiter(delim as u8)
626        .flexible(true)
627        .from_reader(body);
628
629    let headers_owned: Option<Vec<String>> = if has_header {
630        let h = rdr
631            .headers()
632            .map_err(|e| SelectError::InputFormat(format!("CSV headers: {e}")))?
633            .iter()
634            .map(|s| s.to_owned())
635            .collect();
636        Some(h)
637    } else {
638        None
639    };
640    let header_slice: Option<&[String]> = headers_owned.as_deref();
641
642    let mut out = Vec::with_capacity(body.len() / 2);
643    for record in rdr.records() {
644        let record = record
645            .map_err(|e| SelectError::InputFormat(format!("CSV record: {e}")))?;
646        let fields: Vec<&str> = record.iter().collect();
647        let row = CsvRow {
648            fields,
649            headers: header_slice,
650        };
651        if let Some(values) = evaluate_row(&query, &row)? {
652            write_output_row(&query, &values, &output, &mut out)?;
653        }
654    }
655    Ok(out)
656}
657
658/// Run a Select against a JSON-Lines body (`{...}\n{...}\n...`). One row
659/// per top-level JSON object. Nested values are stringified for CSV
660/// output; for JSON output, the projected fields are re-emitted with
661/// their original JSON literal.
662pub fn run_select_jsonlines(
663    sql: &str,
664    body: &[u8],
665    output: SelectOutputFormat,
666) -> Result<Vec<u8>, SelectError> {
667    let query = parse_select(sql)?;
668    let text = std::str::from_utf8(body)
669        .map_err(|e| SelectError::InputFormat(format!("body is not valid UTF-8: {e}")))?;
670    let mut out = Vec::with_capacity(body.len() / 2);
671    for (lineno, line) in text.lines().enumerate() {
672        let line = line.trim();
673        if line.is_empty() {
674            continue;
675        }
676        let v: serde_json::Value = serde_json::from_str(line).map_err(|e| {
677            SelectError::InputFormat(format!("JSON parse on line {}: {e}", lineno + 1))
678        })?;
679        let obj = v.as_object().ok_or_else(|| {
680            SelectError::InputFormat(format!(
681                "JSON Lines requires top-level object, line {} was not an object",
682                lineno + 1
683            ))
684        })?;
685        // Reify the object as ordered (header_name, value_str) pairs so
686        // the existing CsvRow evaluator works against it.
687        let headers: Vec<String> = obj.keys().cloned().collect();
688        let raw_strs: Vec<String> = obj
689            .values()
690            .map(|jv| match jv {
691                serde_json::Value::String(s) => s.clone(),
692                other => other.to_string(),
693            })
694            .collect();
695        let fields: Vec<&str> = raw_strs.iter().map(|s| s.as_str()).collect();
696        let row = CsvRow {
697            fields,
698            headers: Some(headers.as_slice()),
699        };
700        if let Some(values) = evaluate_row(&query, &row)? {
701            write_jsonlines_row(&query, &headers, &values, &output, &mut out)?;
702        }
703    }
704    Ok(out)
705}
706
707fn write_output_row(
708    query: &SelectQuery,
709    values: &[String],
710    output: &SelectOutputFormat,
711    out: &mut Vec<u8>,
712) -> Result<(), SelectError> {
713    match output {
714        SelectOutputFormat::Csv => {
715            let mut wtr = csv::WriterBuilder::new()
716                .terminator(csv::Terminator::CRLF)
717                .from_writer(Vec::new());
718            wtr.write_record(values.iter().map(String::as_str))
719                .map_err(|e| SelectError::InputFormat(format!("CSV write: {e}")))?;
720            wtr.flush()
721                .map_err(|e| SelectError::InputFormat(format!("CSV flush: {e}")))?;
722            let inner = wtr
723                .into_inner()
724                .map_err(|e| SelectError::InputFormat(format!("CSV finish: {e}")))?;
725            out.extend_from_slice(&inner);
726        }
727        SelectOutputFormat::Json => {
728            let names = projection_names(query, values.len());
729            let mut map = serde_json::Map::with_capacity(values.len());
730            for (n, v) in names.iter().zip(values.iter()) {
731                map.insert(n.clone(), serde_json::Value::String(v.clone()));
732            }
733            let line = serde_json::to_string(&serde_json::Value::Object(map))
734                .map_err(|e| SelectError::InputFormat(format!("JSON serialize: {e}")))?;
735            out.extend_from_slice(line.as_bytes());
736            out.push(b'\n');
737        }
738    }
739    Ok(())
740}
741
742fn write_jsonlines_row(
743    query: &SelectQuery,
744    headers: &[String],
745    values: &[String],
746    output: &SelectOutputFormat,
747    out: &mut Vec<u8>,
748) -> Result<(), SelectError> {
749    match output {
750        SelectOutputFormat::Csv => write_output_row(query, values, output, out)?,
751        SelectOutputFormat::Json => {
752            let names = projection_names_with_headers(query, headers, values.len());
753            let mut map = serde_json::Map::with_capacity(values.len());
754            for (n, v) in names.iter().zip(values.iter()) {
755                map.insert(n.clone(), serde_json::Value::String(v.clone()));
756            }
757            let line = serde_json::to_string(&serde_json::Value::Object(map))
758                .map_err(|e| SelectError::InputFormat(format!("JSON serialize: {e}")))?;
759            out.extend_from_slice(line.as_bytes());
760            out.push(b'\n');
761        }
762    }
763    Ok(())
764}
765
766fn projection_names(query: &SelectQuery, fallback_len: usize) -> Vec<String> {
767    let mut names = Vec::with_capacity(fallback_len);
768    for (i, item) in query.projection.iter().enumerate() {
769        match item {
770            SelectItem::ExprWithAlias { alias, .. } => names.push(alias.value.clone()),
771            SelectItem::UnnamedExpr(e) => match expr_as_column(e) {
772                Ok(s) => names.push(s),
773                Err(_) => names.push(format!("_{}", i + 1)),
774            },
775            SelectItem::Wildcard(_) | SelectItem::QualifiedWildcard(_, _) => {
776                for j in names.len()..fallback_len {
777                    names.push(format!("_{}", j + 1));
778                }
779                return names;
780            }
781        }
782    }
783    while names.len() < fallback_len {
784        let n = names.len();
785        names.push(format!("_{}", n + 1));
786    }
787    names
788}
789
790fn projection_names_with_headers(
791    query: &SelectQuery,
792    headers: &[String],
793    fallback_len: usize,
794) -> Vec<String> {
795    let mut names = Vec::with_capacity(fallback_len);
796    for (i, item) in query.projection.iter().enumerate() {
797        match item {
798            SelectItem::Wildcard(_) | SelectItem::QualifiedWildcard(_, _) => {
799                for h in headers {
800                    names.push(h.clone());
801                }
802                while names.len() < fallback_len {
803                    let n = names.len();
804                    names.push(format!("_{}", n + 1));
805                }
806                return names;
807            }
808            SelectItem::ExprWithAlias { alias, .. } => names.push(alias.value.clone()),
809            SelectItem::UnnamedExpr(e) => match expr_as_column(e) {
810                Ok(s) => names.push(s),
811                Err(_) => names.push(format!("_{}", i + 1)),
812            },
813        }
814    }
815    while names.len() < fallback_len {
816        let n = names.len();
817        names.push(format!("_{}", n + 1));
818    }
819    names
820}
821
822// =====================================================================
823// AWS event-stream framing
824// =====================================================================
825
826/// Emits AWS event-stream binary frames for a Select response. Each frame
827/// is `[total_len BE u32][headers_len BE u32][prelude CRC32][headers...][payload][message CRC32]`.
828///
829/// Header value type is fixed at `7` (UTF-8 string). Headers always
830/// emitted: `:event-type`, `:message-type`, plus `:content-type` for
831/// payload-bearing frames (Records / Stats).
832#[derive(Debug, Default)]
833pub struct EventStreamWriter {}
834
835impl EventStreamWriter {
836    #[must_use]
837    pub fn new() -> Self {
838        Self {}
839    }
840
841    /// Build a `Records` frame. `payload` is the (optionally empty) body
842    /// chunk — typically a CSV / JSON-Lines slab of one or more output
843    /// rows. AWS allows splitting a logical record across frames.
844    pub fn records(&mut self, payload: &[u8]) -> Vec<u8> {
845        build_frame(
846            &[
847                (":event-type", "Records"),
848                (":content-type", "application/octet-stream"),
849                (":message-type", "event"),
850            ],
851            Some(payload),
852        )
853    }
854
855    /// Build a `Stats` frame containing the standard
856    /// `BytesScanned` / `BytesProcessed` / `BytesReturned` XML payload.
857    pub fn stats(&mut self, scanned: u64, processed: u64, returned: u64) -> Vec<u8> {
858        let xml = format!(
859            "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
860<Stats xmlns=\"\">\
861<BytesScanned>{scanned}</BytesScanned>\
862<BytesProcessed>{processed}</BytesProcessed>\
863<BytesReturned>{returned}</BytesReturned>\
864</Stats>"
865        );
866        build_frame(
867            &[
868                (":event-type", "Stats"),
869                (":content-type", "text/xml"),
870                (":message-type", "event"),
871            ],
872            Some(xml.as_bytes()),
873        )
874    }
875
876    /// Build the terminating `End` frame. Clients must wait for this
877    /// before assuming the response stream is complete.
878    pub fn end(&mut self) -> Vec<u8> {
879        build_frame(
880            &[
881                (":event-type", "End"),
882                (":message-type", "event"),
883            ],
884            None,
885        )
886    }
887}
888
889fn build_frame(headers: &[(&str, &str)], payload: Option<&[u8]>) -> Vec<u8> {
890    let mut header_buf: Vec<u8> = Vec::new();
891    for (name, value) in headers {
892        let name_bytes = name.as_bytes();
893        let value_bytes = value.as_bytes();
894        debug_assert!(name_bytes.len() <= u8::MAX as usize, "header name too long");
895        debug_assert!(value_bytes.len() <= u16::MAX as usize, "header value too long");
896        header_buf.push(name_bytes.len() as u8);
897        header_buf.extend_from_slice(name_bytes);
898        header_buf.push(7); // value type 7 == UTF-8 string
899        header_buf.extend_from_slice(&(value_bytes.len() as u16).to_be_bytes());
900        header_buf.extend_from_slice(value_bytes);
901    }
902    let payload_bytes = payload.unwrap_or(&[]);
903    let headers_len: u32 = header_buf.len() as u32;
904    let total_len: u32 = 12 + headers_len + payload_bytes.len() as u32 + 4;
905
906    let mut buf: Vec<u8> = Vec::with_capacity(total_len as usize);
907    buf.extend_from_slice(&total_len.to_be_bytes());
908    buf.extend_from_slice(&headers_len.to_be_bytes());
909    let prelude_crc = crc32fast::hash(&buf[..8]);
910    buf.extend_from_slice(&prelude_crc.to_be_bytes());
911    buf.extend_from_slice(&header_buf);
912    buf.extend_from_slice(payload_bytes);
913    let message_crc = crc32fast::hash(&buf[..buf.len()]);
914    buf.extend_from_slice(&message_crc.to_be_bytes());
915    buf
916}
917
918// =====================================================================
919// GPU stub (v0.7+ scope marker)
920// =====================================================================
921
922/// GPU acceleration stub — always returns `None` today. The integration
923/// test verifies it's wired but inactive; v0.7 will swap in an actual
924/// CUDA WHERE-evaluator.
925#[must_use]
926pub fn select_gpu(
927    _sql: &str,
928    _body: &[u8],
929    _input: &SelectInputFormat,
930) -> Option<Vec<u8>> {
931    None
932}
933
934// =====================================================================
935// Unit tests
936// =====================================================================
937
938#[cfg(test)]
939mod tests {
940    use super::*;
941
942    fn csv_input() -> SelectInputFormat {
943        SelectInputFormat::Csv {
944            has_header: true,
945            delimiter: ',',
946        }
947    }
948
949    #[test]
950    fn parse_select_happy_path() {
951        let q = parse_select("SELECT name, age FROM s3object WHERE age > 30").unwrap();
952        assert_eq!(q.projection.len(), 2);
953        assert!(q.where_clause.is_some());
954        assert_eq!(q.from_alias.to_lowercase(), "s3object");
955    }
956
957    #[test]
958    fn parse_select_rejects_group_by() {
959        let err =
960            parse_select("SELECT name, COUNT(*) FROM s3object GROUP BY name").unwrap_err();
961        match err {
962            SelectError::UnsupportedFeature(_) => {}
963            other => panic!("expected UnsupportedFeature, got {other:?}"),
964        }
965    }
966
967    #[test]
968    fn parse_select_rejects_join() {
969        let err = parse_select("SELECT a.x FROM s3object a JOIN other b ON a.id = b.id")
970            .unwrap_err();
971        assert!(matches!(err, SelectError::UnsupportedFeature(_)));
972    }
973
974    #[test]
975    fn parse_select_rejects_order_by() {
976        let err = parse_select("SELECT name FROM s3object ORDER BY name").unwrap_err();
977        assert!(matches!(err, SelectError::UnsupportedFeature(_)));
978    }
979
980    #[test]
981    fn evaluate_row_eq_match() {
982        let q = parse_select("SELECT name FROM s3object WHERE name = 'alice'").unwrap();
983        let headers = vec!["name".to_owned(), "age".to_owned()];
984        let row = CsvRow {
985            fields: vec!["alice", "30"],
986            headers: Some(&headers),
987        };
988        let r = evaluate_row(&q, &row).unwrap();
989        assert_eq!(r, Some(vec!["alice".to_owned()]));
990
991        let row2 = CsvRow {
992            fields: vec!["bob", "30"],
993            headers: Some(&headers),
994        };
995        assert_eq!(evaluate_row(&q, &row2).unwrap(), None);
996    }
997
998    #[test]
999    fn evaluate_row_int_compare() {
1000        let q = parse_select("SELECT age FROM s3object WHERE age > 100").unwrap();
1001        let headers = vec!["name".to_owned(), "age".to_owned()];
1002        let big = CsvRow {
1003            fields: vec!["x", "200"],
1004            headers: Some(&headers),
1005        };
1006        let small = CsvRow {
1007            fields: vec!["x", "50"],
1008            headers: Some(&headers),
1009        };
1010        assert!(evaluate_row(&q, &big).unwrap().is_some());
1011        assert!(evaluate_row(&q, &small).unwrap().is_none());
1012    }
1013
1014    #[test]
1015    fn evaluate_row_like_pattern() {
1016        let q = parse_select("SELECT name FROM s3object WHERE name LIKE 'foo%'").unwrap();
1017        let headers = vec!["name".to_owned()];
1018        let yes = CsvRow {
1019            fields: vec!["foobar"],
1020            headers: Some(&headers),
1021        };
1022        let no = CsvRow {
1023            fields: vec!["xfoobar"],
1024            headers: Some(&headers),
1025        };
1026        assert!(evaluate_row(&q, &yes).unwrap().is_some());
1027        assert!(evaluate_row(&q, &no).unwrap().is_none());
1028    }
1029
1030    #[test]
1031    fn run_select_csv_end_to_end_filters_rows() {
1032        let body = b"name,age\nalice,30\nbob,40\ncarol,50\n";
1033        let out = run_select_csv(
1034            "SELECT name FROM s3object WHERE age > 35",
1035            body,
1036            csv_input(),
1037            SelectOutputFormat::Csv,
1038        )
1039        .unwrap();
1040        let s = std::str::from_utf8(&out).unwrap();
1041        let lines: Vec<&str> = s.split("\r\n").filter(|l| !l.is_empty()).collect();
1042        assert_eq!(lines, vec!["bob", "carol"]);
1043    }
1044
1045    #[test]
1046    fn run_select_jsonlines_filter() {
1047        let body = b"{\"name\":\"alice\",\"age\":\"30\"}\n\
1048                     {\"name\":\"bob\",\"age\":\"40\"}\n\
1049                     {\"name\":\"carol\",\"age\":\"50\"}\n";
1050        let out = run_select_jsonlines(
1051            "SELECT name FROM s3object WHERE age > 35",
1052            body,
1053            SelectOutputFormat::Json,
1054        )
1055        .unwrap();
1056        let s = std::str::from_utf8(&out).unwrap();
1057        let lines: Vec<&str> = s.lines().filter(|l| !l.is_empty()).collect();
1058        assert_eq!(lines.len(), 2);
1059        assert!(lines[0].contains("bob"));
1060        assert!(lines[1].contains("carol"));
1061    }
1062
1063    #[test]
1064    fn positional_column_ref() {
1065        let body = b"alice,30\nbob,40\n";
1066        let out = run_select_csv(
1067            "SELECT _1 FROM s3object WHERE _2 > 35",
1068            body,
1069            SelectInputFormat::Csv {
1070                has_header: false,
1071                delimiter: ',',
1072            },
1073            SelectOutputFormat::Csv,
1074        )
1075        .unwrap();
1076        let s = std::str::from_utf8(&out).unwrap();
1077        let lines: Vec<&str> = s.split("\r\n").filter(|l| !l.is_empty()).collect();
1078        assert_eq!(lines, vec!["bob"]);
1079    }
1080
1081    #[test]
1082    fn and_or_combination() {
1083        let body = b"name,age,city\n\
1084                     alice,30,nyc\n\
1085                     bob,40,nyc\n\
1086                     carol,50,sf\n\
1087                     dan,25,sf\n";
1088        let out = run_select_csv(
1089            "SELECT name FROM s3object WHERE (city = 'nyc' AND age > 35) OR name = 'dan'",
1090            body,
1091            csv_input(),
1092            SelectOutputFormat::Csv,
1093        )
1094        .unwrap();
1095        let s = std::str::from_utf8(&out).unwrap();
1096        let mut lines: Vec<&str> = s.split("\r\n").filter(|l| !l.is_empty()).collect();
1097        lines.sort_unstable();
1098        assert_eq!(lines, vec!["bob", "dan"]);
1099    }
1100
1101    #[test]
1102    fn event_stream_records_frame_format() {
1103        let mut w = EventStreamWriter::new();
1104        let frame = w.records(b"hello,world\r\n");
1105        let total =
1106            u32::from_be_bytes([frame[0], frame[1], frame[2], frame[3]]) as usize;
1107        assert_eq!(total, frame.len());
1108        let headers_len =
1109            u32::from_be_bytes([frame[4], frame[5], frame[6], frame[7]]) as usize;
1110        let prelude_crc =
1111            u32::from_be_bytes([frame[8], frame[9], frame[10], frame[11]]);
1112        assert_eq!(prelude_crc, crc32fast::hash(&frame[..8]));
1113        let msg_crc = u32::from_be_bytes([
1114            frame[total - 4],
1115            frame[total - 3],
1116            frame[total - 2],
1117            frame[total - 1],
1118        ]);
1119        assert_eq!(msg_crc, crc32fast::hash(&frame[..total - 4]));
1120        let hdr_region = &frame[12..12 + headers_len];
1121        let s = String::from_utf8_lossy(hdr_region);
1122        assert!(s.contains(":event-type"));
1123        assert!(s.contains("Records"));
1124        let payload = &frame[12 + headers_len..total - 4];
1125        assert_eq!(payload, b"hello,world\r\n");
1126    }
1127
1128    #[test]
1129    fn event_stream_end_frame_no_payload() {
1130        let mut w = EventStreamWriter::new();
1131        let frame = w.end();
1132        let total =
1133            u32::from_be_bytes([frame[0], frame[1], frame[2], frame[3]]) as usize;
1134        let headers_len =
1135            u32::from_be_bytes([frame[4], frame[5], frame[6], frame[7]]) as usize;
1136        assert_eq!(total - 4 - 12 - headers_len, 0);
1137        let s = String::from_utf8_lossy(&frame[12..12 + headers_len]);
1138        assert!(s.contains("End"));
1139    }
1140
1141    #[test]
1142    fn event_stream_stats_xml_payload() {
1143        let mut w = EventStreamWriter::new();
1144        let frame = w.stats(1024, 800, 64);
1145        let total =
1146            u32::from_be_bytes([frame[0], frame[1], frame[2], frame[3]]) as usize;
1147        let headers_len =
1148            u32::from_be_bytes([frame[4], frame[5], frame[6], frame[7]]) as usize;
1149        let payload = &frame[12 + headers_len..total - 4];
1150        let xml = std::str::from_utf8(payload).unwrap();
1151        assert!(xml.contains("<BytesScanned>1024</BytesScanned>"));
1152        assert!(xml.contains("<BytesProcessed>800</BytesProcessed>"));
1153        assert!(xml.contains("<BytesReturned>64</BytesReturned>"));
1154    }
1155
1156    #[test]
1157    fn gpu_stub_returns_none() {
1158        let v = select_gpu(
1159            "SELECT * FROM s3object",
1160            b"name,age\nalice,30\n",
1161            &csv_input(),
1162        );
1163        assert!(v.is_none(), "GPU stub must always return None for v0.6");
1164    }
1165
1166    #[test]
1167    fn like_match_basics() {
1168        assert!(like_match("foobar", "foo%"));
1169        assert!(!like_match("xfoobar", "foo%"));
1170        assert!(like_match("abc", "_b_"));
1171        assert!(like_match("anything", "%"));
1172        assert!(like_match("", ""));
1173        assert!(!like_match("a", ""));
1174    }
1175}