1use sqlparser::ast::{
48 BinaryOperator, Expr, GroupByExpr, ObjectName, Query, Select, SelectItem, SetExpr,
49 Statement, TableFactor, UnaryOperator, Value,
50};
51use sqlparser::dialect::GenericDialect;
52use sqlparser::parser::Parser;
53
54#[derive(Debug, thiserror::Error)]
59pub enum SelectError {
60 #[error("SQL parse error: {0}")]
61 Parse(String),
62 #[error("unsupported SQL feature: {0}")]
63 UnsupportedFeature(String),
64 #[error("input format error: {0}")]
65 InputFormat(String),
66 #[error("row evaluation error: {0}")]
67 RowEval(String),
68}
69
70#[derive(Debug, Clone)]
75pub enum SelectInputFormat {
76 Csv { has_header: bool, delimiter: char },
77 JsonLines,
78}
79
80#[derive(Debug, Clone)]
81pub enum SelectOutputFormat {
82 Csv,
83 Json,
84}
85
86#[derive(Debug, Clone)]
91pub struct SelectQuery {
92 pub projection: Vec<SelectItem>,
95 pub where_clause: Option<Expr>,
96 pub from_alias: String,
100}
101
102pub fn parse_select(sql: &str) -> Result<SelectQuery, SelectError> {
107 let dialect = GenericDialect {};
108 let mut statements = Parser::parse_sql(&dialect, sql)
109 .map_err(|e| SelectError::Parse(e.to_string()))?;
110 if statements.len() != 1 {
111 return Err(SelectError::Parse(format!(
112 "expected exactly one statement, got {}",
113 statements.len()
114 )));
115 }
116 let stmt = statements.pop().expect("len == 1");
117 let query = match stmt {
118 Statement::Query(q) => *q,
119 other => {
120 return Err(SelectError::UnsupportedFeature(format!(
121 "only SELECT statements are supported, got: {other:?}"
122 )));
123 }
124 };
125 let Query {
126 body, order_by, limit, offset, fetch, locks, with, ..
127 } = query;
128 if with.is_some() {
129 return Err(SelectError::UnsupportedFeature("CTE / WITH".into()));
130 }
131 if order_by.is_some() {
132 return Err(SelectError::UnsupportedFeature("ORDER BY".into()));
133 }
134 if limit.is_some() {
135 return Err(SelectError::UnsupportedFeature("LIMIT".into()));
136 }
137 if offset.is_some() {
138 return Err(SelectError::UnsupportedFeature("OFFSET".into()));
139 }
140 if fetch.is_some() {
141 return Err(SelectError::UnsupportedFeature("FETCH".into()));
142 }
143 if !locks.is_empty() {
144 return Err(SelectError::UnsupportedFeature("FOR UPDATE / lock clauses".into()));
145 }
146
147 let select = match *body {
148 SetExpr::Select(s) => *s,
149 SetExpr::Query(_) => {
150 return Err(SelectError::UnsupportedFeature("nested query".into()));
151 }
152 SetExpr::SetOperation { .. } => {
153 return Err(SelectError::UnsupportedFeature("set operation (UNION/INTERSECT/EXCEPT)".into()));
154 }
155 other => {
156 return Err(SelectError::UnsupportedFeature(format!("unsupported SetExpr: {other:?}")));
157 }
158 };
159
160 let Select {
161 distinct,
162 top,
163 projection,
164 from,
165 selection,
166 group_by,
167 having,
168 named_window,
169 qualify,
170 cluster_by,
171 distribute_by,
172 sort_by,
173 prewhere,
174 connect_by,
175 ..
176 } = select;
177 if distinct.is_some() {
178 return Err(SelectError::UnsupportedFeature("DISTINCT".into()));
179 }
180 if top.is_some() {
181 return Err(SelectError::UnsupportedFeature("TOP".into()));
182 }
183 if having.is_some() {
184 return Err(SelectError::UnsupportedFeature("HAVING".into()));
185 }
186 if !named_window.is_empty() {
187 return Err(SelectError::UnsupportedFeature("WINDOW".into()));
188 }
189 if qualify.is_some() {
190 return Err(SelectError::UnsupportedFeature("QUALIFY".into()));
191 }
192 if !cluster_by.is_empty() || !distribute_by.is_empty() || !sort_by.is_empty() {
193 return Err(SelectError::UnsupportedFeature(
194 "CLUSTER BY / DISTRIBUTE BY / SORT BY".into(),
195 ));
196 }
197 if prewhere.is_some() {
198 return Err(SelectError::UnsupportedFeature("PREWHERE".into()));
199 }
200 if connect_by.is_some() {
201 return Err(SelectError::UnsupportedFeature("CONNECT BY".into()));
202 }
203 match group_by {
204 GroupByExpr::Expressions(ref exprs, ref mods) if exprs.is_empty() && mods.is_empty() => {}
205 _ => return Err(SelectError::UnsupportedFeature("GROUP BY".into())),
206 }
207
208 for item in &projection {
211 validate_projection_item(item)?;
212 }
213 if let Some(ref where_expr) = selection {
214 validate_where_expr(where_expr)?;
215 }
216
217 let from_alias = match from.as_slice() {
219 [twj] if twj.joins.is_empty() => match &twj.relation {
220 TableFactor::Table { name, alias, .. } => alias
221 .as_ref()
222 .map(|a| a.name.value.clone())
223 .unwrap_or_else(|| object_name_to_string(name)),
224 _ => {
225 return Err(SelectError::UnsupportedFeature(
226 "only `FROM s3object` (or aliased single table) is supported".into(),
227 ));
228 }
229 },
230 [] => "s3object".to_owned(),
231 _ => return Err(SelectError::UnsupportedFeature("JOIN / multiple FROM tables".into())),
232 };
233
234 Ok(SelectQuery {
235 projection,
236 where_clause: selection,
237 from_alias,
238 })
239}
240
241fn object_name_to_string(name: &ObjectName) -> String {
242 name.0
243 .iter()
244 .map(|i| i.value.as_str())
245 .collect::<Vec<_>>()
246 .join(".")
247}
248
249fn validate_projection_item(item: &SelectItem) -> Result<(), SelectError> {
250 match item {
251 SelectItem::Wildcard(_) => Ok(()),
252 SelectItem::QualifiedWildcard(_, _) => Ok(()),
253 SelectItem::UnnamedExpr(e) | SelectItem::ExprWithAlias { expr: e, .. } => {
254 validate_simple_column_expr(e)
255 }
256 }
257}
258
259fn validate_simple_column_expr(expr: &Expr) -> Result<(), SelectError> {
260 match expr {
261 Expr::Identifier(_) | Expr::CompoundIdentifier(_) => Ok(()),
262 Expr::Function(_) => Err(SelectError::UnsupportedFeature(
263 "aggregate / scalar function in projection (only bare column references supported)".into(),
264 )),
265 Expr::Subquery(_) | Expr::Exists { .. } => {
266 Err(SelectError::UnsupportedFeature("subquery in projection".into()))
267 }
268 _ => Err(SelectError::UnsupportedFeature(format!(
269 "unsupported projection expression: {expr}"
270 ))),
271 }
272}
273
274fn validate_where_expr(expr: &Expr) -> Result<(), SelectError> {
275 match expr {
276 Expr::Identifier(_) | Expr::CompoundIdentifier(_) | Expr::Value(_) => Ok(()),
277 Expr::Nested(inner) => validate_where_expr(inner),
278 Expr::UnaryOp { op, expr } => match op {
279 UnaryOperator::Not | UnaryOperator::Minus | UnaryOperator::Plus => {
280 validate_where_expr(expr)
281 }
282 other => Err(SelectError::UnsupportedFeature(format!(
283 "unsupported unary operator in WHERE: {other:?}"
284 ))),
285 },
286 Expr::BinaryOp { op, left, right } => match op {
287 BinaryOperator::Eq
288 | BinaryOperator::NotEq
289 | BinaryOperator::Lt
290 | BinaryOperator::LtEq
291 | BinaryOperator::Gt
292 | BinaryOperator::GtEq
293 | BinaryOperator::And
294 | BinaryOperator::Or => {
295 validate_where_expr(left)?;
296 validate_where_expr(right)
297 }
298 other => Err(SelectError::UnsupportedFeature(format!(
299 "unsupported binary operator in WHERE: {other:?}"
300 ))),
301 },
302 Expr::Like { expr, pattern, .. } => {
303 validate_where_expr(expr)?;
304 validate_where_expr(pattern)
305 }
306 Expr::IsNull(e) | Expr::IsNotNull(e) => validate_where_expr(e),
307 Expr::Function(_) => Err(SelectError::UnsupportedFeature(
308 "function call in WHERE".into(),
309 )),
310 Expr::Subquery(_) | Expr::Exists { .. } | Expr::InSubquery { .. } => {
311 Err(SelectError::UnsupportedFeature("subquery in WHERE".into()))
312 }
313 other => Err(SelectError::UnsupportedFeature(format!(
314 "unsupported WHERE expression: {other}"
315 ))),
316 }
317}
318
319pub struct CsvRow<'a> {
326 pub fields: Vec<&'a str>,
327 pub headers: Option<&'a [String]>,
328}
329
330impl CsvRow<'_> {
331 #[must_use]
335 pub fn get(&self, ident: &str) -> Option<&str> {
336 if let Some(stripped) = ident.strip_prefix('_')
337 && let Ok(n) = stripped.parse::<usize>()
338 && n >= 1
339 {
340 return self.fields.get(n - 1).copied();
341 }
342 if let Some(headers) = self.headers {
345 for (i, h) in headers.iter().enumerate() {
346 if h.eq_ignore_ascii_case(ident) {
347 return self.fields.get(i).copied();
348 }
349 }
350 }
351 None
352 }
353}
354
355#[derive(Debug, Clone)]
362enum Lit<'a> {
363 Null,
364 Bool(bool),
365 Int(i64),
366 Float(f64),
367 Str(std::borrow::Cow<'a, str>),
368}
369
370impl<'a> Lit<'a> {
371 fn from_str_value(s: &'a str) -> Lit<'a> {
372 Lit::Str(std::borrow::Cow::Borrowed(s))
373 }
374
375 fn truthy(&self) -> bool {
376 matches!(self, Lit::Bool(true))
377 }
378}
379
380pub fn evaluate_row(
385 query: &SelectQuery,
386 row: &CsvRow<'_>,
387) -> Result<Option<Vec<String>>, SelectError> {
388 if let Some(ref w) = query.where_clause {
389 let v = eval_expr(w, row)?;
390 if !v.truthy() {
391 return Ok(None);
392 }
393 }
394 let mut out = Vec::with_capacity(query.projection.len());
395 for item in &query.projection {
396 match item {
397 SelectItem::Wildcard(_) | SelectItem::QualifiedWildcard(_, _) => {
398 for f in &row.fields {
399 out.push((*f).to_owned());
400 }
401 }
402 SelectItem::UnnamedExpr(e) | SelectItem::ExprWithAlias { expr: e, .. } => {
403 let ident = expr_as_column(e)?;
404 let v = row.get(&ident).ok_or_else(|| {
405 SelectError::RowEval(format!("column not found: {ident}"))
406 })?;
407 out.push(v.to_owned());
408 }
409 }
410 }
411 Ok(Some(out))
412}
413
414fn expr_as_column(expr: &Expr) -> Result<String, SelectError> {
415 match expr {
416 Expr::Identifier(i) => Ok(i.value.clone()),
417 Expr::CompoundIdentifier(parts) => parts
418 .last()
419 .map(|p| p.value.clone())
420 .ok_or_else(|| SelectError::RowEval("empty compound identifier".into())),
421 other => Err(SelectError::UnsupportedFeature(format!(
422 "non-column projection: {other}"
423 ))),
424 }
425}
426
427fn eval_expr<'a>(expr: &Expr, row: &'a CsvRow<'a>) -> Result<Lit<'a>, SelectError> {
428 match expr {
429 Expr::Nested(inner) => eval_expr(inner, row),
430 Expr::Identifier(i) => Ok(row
431 .get(&i.value)
432 .map_or(Lit::Null, Lit::from_str_value)),
433 Expr::CompoundIdentifier(parts) => {
434 let last = parts
435 .last()
436 .ok_or_else(|| SelectError::RowEval("empty compound identifier".into()))?;
437 Ok(row
438 .get(&last.value)
439 .map_or(Lit::Null, Lit::from_str_value))
440 }
441 Expr::Value(v) => value_to_lit(v),
442 Expr::UnaryOp { op, expr } => {
443 let v = eval_expr(expr, row)?;
444 match op {
445 UnaryOperator::Not => Ok(Lit::Bool(!v.truthy())),
446 UnaryOperator::Minus => match v {
447 Lit::Int(n) => Ok(Lit::Int(-n)),
448 Lit::Float(f) => Ok(Lit::Float(-f)),
449 other => Err(SelectError::RowEval(format!(
450 "cannot negate non-numeric value: {other:?}"
451 ))),
452 },
453 UnaryOperator::Plus => Ok(v),
454 other => Err(SelectError::UnsupportedFeature(format!(
455 "unsupported unary op: {other:?}"
456 ))),
457 }
458 }
459 Expr::BinaryOp { op, left, right } => {
460 let l = eval_expr(left, row)?;
461 let r = eval_expr(right, row)?;
462 eval_binary(op, &l, &r)
463 }
464 Expr::Like { negated, expr, pattern, escape_char } => {
465 if escape_char.is_some() {
466 return Err(SelectError::UnsupportedFeature(
467 "LIKE ESCAPE clause".into(),
468 ));
469 }
470 let s_val = eval_expr(expr, row)?;
471 let p_val = eval_expr(pattern, row)?;
472 let s = lit_as_str(&s_val);
473 let p = lit_as_str(&p_val);
474 let m = like_match(s.as_ref(), p.as_ref());
475 Ok(Lit::Bool(if *negated { !m } else { m }))
476 }
477 Expr::IsNull(e) => Ok(Lit::Bool(matches!(eval_expr(e, row)?, Lit::Null))),
478 Expr::IsNotNull(e) => Ok(Lit::Bool(!matches!(eval_expr(e, row)?, Lit::Null))),
479 other => Err(SelectError::UnsupportedFeature(format!(
480 "unsupported expression in WHERE: {other}"
481 ))),
482 }
483}
484
485fn value_to_lit<'a>(v: &Value) -> Result<Lit<'a>, SelectError> {
486 match v {
487 Value::Number(s, _) => {
488 if let Ok(n) = s.parse::<i64>() {
489 Ok(Lit::Int(n))
490 } else if let Ok(f) = s.parse::<f64>() {
491 Ok(Lit::Float(f))
492 } else {
493 Err(SelectError::RowEval(format!("invalid number literal: {s}")))
494 }
495 }
496 Value::SingleQuotedString(s) | Value::DoubleQuotedString(s) => {
497 Ok(Lit::Str(std::borrow::Cow::Owned(s.clone())))
498 }
499 Value::Boolean(b) => Ok(Lit::Bool(*b)),
500 Value::Null => Ok(Lit::Null),
501 other => Err(SelectError::UnsupportedFeature(format!(
502 "literal kind not supported: {other:?}"
503 ))),
504 }
505}
506
507fn lit_as_str<'a>(v: &Lit<'a>) -> std::borrow::Cow<'a, str> {
508 match v {
509 Lit::Null => std::borrow::Cow::Borrowed(""),
510 Lit::Bool(b) => std::borrow::Cow::Owned(if *b { "true" } else { "false" }.into()),
511 Lit::Int(n) => std::borrow::Cow::Owned(n.to_string()),
512 Lit::Float(f) => std::borrow::Cow::Owned(f.to_string()),
513 Lit::Str(s) => s.clone(),
514 }
515}
516
517fn lit_as_f64(v: &Lit<'_>) -> Option<f64> {
518 match v {
519 Lit::Int(n) => Some(*n as f64),
520 Lit::Float(f) => Some(*f),
521 Lit::Str(s) => s.parse::<f64>().ok(),
522 Lit::Bool(_) | Lit::Null => None,
523 }
524}
525
526fn eval_binary<'a>(
527 op: &BinaryOperator,
528 l: &Lit<'a>,
529 r: &Lit<'a>,
530) -> Result<Lit<'a>, SelectError> {
531 use BinaryOperator::*;
532 match op {
533 And => Ok(Lit::Bool(l.truthy() && r.truthy())),
534 Or => Ok(Lit::Bool(l.truthy() || r.truthy())),
535 Eq | NotEq | Lt | LtEq | Gt | GtEq => {
536 if matches!(l, Lit::Null) || matches!(r, Lit::Null) {
540 return Ok(Lit::Bool(false));
541 }
542 let cmp = if let (Some(a), Some(b)) = (lit_as_f64(l), lit_as_f64(r)) {
545 a.partial_cmp(&b)
546 } else {
547 let a = lit_as_str(l);
548 let b = lit_as_str(r);
549 Some(a.as_ref().cmp(b.as_ref()))
550 };
551 let ord =
552 cmp.ok_or_else(|| SelectError::RowEval("incomparable values (NaN?)".into()))?;
553 let res = match op {
554 Eq => ord == std::cmp::Ordering::Equal,
555 NotEq => ord != std::cmp::Ordering::Equal,
556 Lt => ord == std::cmp::Ordering::Less,
557 LtEq => ord != std::cmp::Ordering::Greater,
558 Gt => ord == std::cmp::Ordering::Greater,
559 GtEq => ord != std::cmp::Ordering::Less,
560 _ => unreachable!("guarded by outer match"),
561 };
562 Ok(Lit::Bool(res))
563 }
564 other => Err(SelectError::UnsupportedFeature(format!(
565 "unsupported binary operator: {other:?}"
566 ))),
567 }
568}
569
570fn like_match(s: &str, pattern: &str) -> bool {
574 let s_bytes: Vec<char> = s.chars().collect();
575 let p_bytes: Vec<char> = pattern.chars().collect();
576 let (mut si, mut pi) = (0usize, 0usize);
577 let (mut star, mut match_si) = (None::<usize>, 0usize);
578 while si < s_bytes.len() {
579 if pi < p_bytes.len() && (p_bytes[pi] == '_' || p_bytes[pi] == s_bytes[si]) {
580 si += 1;
581 pi += 1;
582 } else if pi < p_bytes.len() && p_bytes[pi] == '%' {
583 star = Some(pi);
584 match_si = si;
585 pi += 1;
586 } else if let Some(sp) = star {
587 pi = sp + 1;
588 match_si += 1;
589 si = match_si;
590 } else {
591 return false;
592 }
593 }
594 while pi < p_bytes.len() && p_bytes[pi] == '%' {
595 pi += 1;
596 }
597 pi == p_bytes.len()
598}
599
600pub fn run_select_csv(
608 sql: &str,
609 body: &[u8],
610 input: SelectInputFormat,
611 output: SelectOutputFormat,
612) -> Result<Vec<u8>, SelectError> {
613 let (has_header, delim) = match input {
614 SelectInputFormat::Csv { has_header, delimiter } => (has_header, delimiter),
615 SelectInputFormat::JsonLines => {
616 return Err(SelectError::InputFormat(
617 "run_select_csv called with JsonLines input — use run_select_jsonlines".into(),
618 ));
619 }
620 };
621 let query = parse_select(sql)?;
622
623 let mut rdr = csv::ReaderBuilder::new()
624 .has_headers(has_header)
625 .delimiter(delim as u8)
626 .flexible(true)
627 .from_reader(body);
628
629 let headers_owned: Option<Vec<String>> = if has_header {
630 let h = rdr
631 .headers()
632 .map_err(|e| SelectError::InputFormat(format!("CSV headers: {e}")))?
633 .iter()
634 .map(|s| s.to_owned())
635 .collect();
636 Some(h)
637 } else {
638 None
639 };
640 let header_slice: Option<&[String]> = headers_owned.as_deref();
641
642 let mut out = Vec::with_capacity(body.len() / 2);
643 for record in rdr.records() {
644 let record = record
645 .map_err(|e| SelectError::InputFormat(format!("CSV record: {e}")))?;
646 let fields: Vec<&str> = record.iter().collect();
647 let row = CsvRow {
648 fields,
649 headers: header_slice,
650 };
651 if let Some(values) = evaluate_row(&query, &row)? {
652 write_output_row(&query, &values, &output, &mut out)?;
653 }
654 }
655 Ok(out)
656}
657
658pub fn run_select_jsonlines(
663 sql: &str,
664 body: &[u8],
665 output: SelectOutputFormat,
666) -> Result<Vec<u8>, SelectError> {
667 let query = parse_select(sql)?;
668 let text = std::str::from_utf8(body)
669 .map_err(|e| SelectError::InputFormat(format!("body is not valid UTF-8: {e}")))?;
670 let mut out = Vec::with_capacity(body.len() / 2);
671 for (lineno, line) in text.lines().enumerate() {
672 let line = line.trim();
673 if line.is_empty() {
674 continue;
675 }
676 let v: serde_json::Value = serde_json::from_str(line).map_err(|e| {
677 SelectError::InputFormat(format!("JSON parse on line {}: {e}", lineno + 1))
678 })?;
679 let obj = v.as_object().ok_or_else(|| {
680 SelectError::InputFormat(format!(
681 "JSON Lines requires top-level object, line {} was not an object",
682 lineno + 1
683 ))
684 })?;
685 let headers: Vec<String> = obj.keys().cloned().collect();
688 let raw_strs: Vec<String> = obj
689 .values()
690 .map(|jv| match jv {
691 serde_json::Value::String(s) => s.clone(),
692 other => other.to_string(),
693 })
694 .collect();
695 let fields: Vec<&str> = raw_strs.iter().map(|s| s.as_str()).collect();
696 let row = CsvRow {
697 fields,
698 headers: Some(headers.as_slice()),
699 };
700 if let Some(values) = evaluate_row(&query, &row)? {
701 write_jsonlines_row(&query, &headers, &values, &output, &mut out)?;
702 }
703 }
704 Ok(out)
705}
706
707fn write_output_row(
708 query: &SelectQuery,
709 values: &[String],
710 output: &SelectOutputFormat,
711 out: &mut Vec<u8>,
712) -> Result<(), SelectError> {
713 match output {
714 SelectOutputFormat::Csv => {
715 let mut wtr = csv::WriterBuilder::new()
716 .terminator(csv::Terminator::CRLF)
717 .from_writer(Vec::new());
718 wtr.write_record(values.iter().map(String::as_str))
719 .map_err(|e| SelectError::InputFormat(format!("CSV write: {e}")))?;
720 wtr.flush()
721 .map_err(|e| SelectError::InputFormat(format!("CSV flush: {e}")))?;
722 let inner = wtr
723 .into_inner()
724 .map_err(|e| SelectError::InputFormat(format!("CSV finish: {e}")))?;
725 out.extend_from_slice(&inner);
726 }
727 SelectOutputFormat::Json => {
728 let names = projection_names(query, values.len());
729 let mut map = serde_json::Map::with_capacity(values.len());
730 for (n, v) in names.iter().zip(values.iter()) {
731 map.insert(n.clone(), serde_json::Value::String(v.clone()));
732 }
733 let line = serde_json::to_string(&serde_json::Value::Object(map))
734 .map_err(|e| SelectError::InputFormat(format!("JSON serialize: {e}")))?;
735 out.extend_from_slice(line.as_bytes());
736 out.push(b'\n');
737 }
738 }
739 Ok(())
740}
741
742fn write_jsonlines_row(
743 query: &SelectQuery,
744 headers: &[String],
745 values: &[String],
746 output: &SelectOutputFormat,
747 out: &mut Vec<u8>,
748) -> Result<(), SelectError> {
749 match output {
750 SelectOutputFormat::Csv => write_output_row(query, values, output, out)?,
751 SelectOutputFormat::Json => {
752 let names = projection_names_with_headers(query, headers, values.len());
753 let mut map = serde_json::Map::with_capacity(values.len());
754 for (n, v) in names.iter().zip(values.iter()) {
755 map.insert(n.clone(), serde_json::Value::String(v.clone()));
756 }
757 let line = serde_json::to_string(&serde_json::Value::Object(map))
758 .map_err(|e| SelectError::InputFormat(format!("JSON serialize: {e}")))?;
759 out.extend_from_slice(line.as_bytes());
760 out.push(b'\n');
761 }
762 }
763 Ok(())
764}
765
766fn projection_names(query: &SelectQuery, fallback_len: usize) -> Vec<String> {
767 let mut names = Vec::with_capacity(fallback_len);
768 for (i, item) in query.projection.iter().enumerate() {
769 match item {
770 SelectItem::ExprWithAlias { alias, .. } => names.push(alias.value.clone()),
771 SelectItem::UnnamedExpr(e) => match expr_as_column(e) {
772 Ok(s) => names.push(s),
773 Err(_) => names.push(format!("_{}", i + 1)),
774 },
775 SelectItem::Wildcard(_) | SelectItem::QualifiedWildcard(_, _) => {
776 for j in names.len()..fallback_len {
777 names.push(format!("_{}", j + 1));
778 }
779 return names;
780 }
781 }
782 }
783 while names.len() < fallback_len {
784 let n = names.len();
785 names.push(format!("_{}", n + 1));
786 }
787 names
788}
789
790fn projection_names_with_headers(
791 query: &SelectQuery,
792 headers: &[String],
793 fallback_len: usize,
794) -> Vec<String> {
795 let mut names = Vec::with_capacity(fallback_len);
796 for (i, item) in query.projection.iter().enumerate() {
797 match item {
798 SelectItem::Wildcard(_) | SelectItem::QualifiedWildcard(_, _) => {
799 for h in headers {
800 names.push(h.clone());
801 }
802 while names.len() < fallback_len {
803 let n = names.len();
804 names.push(format!("_{}", n + 1));
805 }
806 return names;
807 }
808 SelectItem::ExprWithAlias { alias, .. } => names.push(alias.value.clone()),
809 SelectItem::UnnamedExpr(e) => match expr_as_column(e) {
810 Ok(s) => names.push(s),
811 Err(_) => names.push(format!("_{}", i + 1)),
812 },
813 }
814 }
815 while names.len() < fallback_len {
816 let n = names.len();
817 names.push(format!("_{}", n + 1));
818 }
819 names
820}
821
822#[derive(Debug, Default)]
833pub struct EventStreamWriter {}
834
835impl EventStreamWriter {
836 #[must_use]
837 pub fn new() -> Self {
838 Self {}
839 }
840
841 pub fn records(&mut self, payload: &[u8]) -> Vec<u8> {
845 build_frame(
846 &[
847 (":event-type", "Records"),
848 (":content-type", "application/octet-stream"),
849 (":message-type", "event"),
850 ],
851 Some(payload),
852 )
853 }
854
855 pub fn stats(&mut self, scanned: u64, processed: u64, returned: u64) -> Vec<u8> {
858 let xml = format!(
859 "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\
860<Stats xmlns=\"\">\
861<BytesScanned>{scanned}</BytesScanned>\
862<BytesProcessed>{processed}</BytesProcessed>\
863<BytesReturned>{returned}</BytesReturned>\
864</Stats>"
865 );
866 build_frame(
867 &[
868 (":event-type", "Stats"),
869 (":content-type", "text/xml"),
870 (":message-type", "event"),
871 ],
872 Some(xml.as_bytes()),
873 )
874 }
875
876 pub fn end(&mut self) -> Vec<u8> {
879 build_frame(
880 &[
881 (":event-type", "End"),
882 (":message-type", "event"),
883 ],
884 None,
885 )
886 }
887}
888
889fn build_frame(headers: &[(&str, &str)], payload: Option<&[u8]>) -> Vec<u8> {
890 let mut header_buf: Vec<u8> = Vec::new();
891 for (name, value) in headers {
892 let name_bytes = name.as_bytes();
893 let value_bytes = value.as_bytes();
894 debug_assert!(name_bytes.len() <= u8::MAX as usize, "header name too long");
895 debug_assert!(value_bytes.len() <= u16::MAX as usize, "header value too long");
896 header_buf.push(name_bytes.len() as u8);
897 header_buf.extend_from_slice(name_bytes);
898 header_buf.push(7); header_buf.extend_from_slice(&(value_bytes.len() as u16).to_be_bytes());
900 header_buf.extend_from_slice(value_bytes);
901 }
902 let payload_bytes = payload.unwrap_or(&[]);
903 let headers_len: u32 = header_buf.len() as u32;
904 let total_len: u32 = 12 + headers_len + payload_bytes.len() as u32 + 4;
905
906 let mut buf: Vec<u8> = Vec::with_capacity(total_len as usize);
907 buf.extend_from_slice(&total_len.to_be_bytes());
908 buf.extend_from_slice(&headers_len.to_be_bytes());
909 let prelude_crc = crc32fast::hash(&buf[..8]);
910 buf.extend_from_slice(&prelude_crc.to_be_bytes());
911 buf.extend_from_slice(&header_buf);
912 buf.extend_from_slice(payload_bytes);
913 let message_crc = crc32fast::hash(&buf[..buf.len()]);
914 buf.extend_from_slice(&message_crc.to_be_bytes());
915 buf
916}
917
918#[must_use]
926pub fn select_gpu(
927 _sql: &str,
928 _body: &[u8],
929 _input: &SelectInputFormat,
930) -> Option<Vec<u8>> {
931 None
932}
933
934#[cfg(test)]
939mod tests {
940 use super::*;
941
942 fn csv_input() -> SelectInputFormat {
943 SelectInputFormat::Csv {
944 has_header: true,
945 delimiter: ',',
946 }
947 }
948
949 #[test]
950 fn parse_select_happy_path() {
951 let q = parse_select("SELECT name, age FROM s3object WHERE age > 30").unwrap();
952 assert_eq!(q.projection.len(), 2);
953 assert!(q.where_clause.is_some());
954 assert_eq!(q.from_alias.to_lowercase(), "s3object");
955 }
956
957 #[test]
958 fn parse_select_rejects_group_by() {
959 let err =
960 parse_select("SELECT name, COUNT(*) FROM s3object GROUP BY name").unwrap_err();
961 match err {
962 SelectError::UnsupportedFeature(_) => {}
963 other => panic!("expected UnsupportedFeature, got {other:?}"),
964 }
965 }
966
967 #[test]
968 fn parse_select_rejects_join() {
969 let err = parse_select("SELECT a.x FROM s3object a JOIN other b ON a.id = b.id")
970 .unwrap_err();
971 assert!(matches!(err, SelectError::UnsupportedFeature(_)));
972 }
973
974 #[test]
975 fn parse_select_rejects_order_by() {
976 let err = parse_select("SELECT name FROM s3object ORDER BY name").unwrap_err();
977 assert!(matches!(err, SelectError::UnsupportedFeature(_)));
978 }
979
980 #[test]
981 fn evaluate_row_eq_match() {
982 let q = parse_select("SELECT name FROM s3object WHERE name = 'alice'").unwrap();
983 let headers = vec!["name".to_owned(), "age".to_owned()];
984 let row = CsvRow {
985 fields: vec!["alice", "30"],
986 headers: Some(&headers),
987 };
988 let r = evaluate_row(&q, &row).unwrap();
989 assert_eq!(r, Some(vec!["alice".to_owned()]));
990
991 let row2 = CsvRow {
992 fields: vec!["bob", "30"],
993 headers: Some(&headers),
994 };
995 assert_eq!(evaluate_row(&q, &row2).unwrap(), None);
996 }
997
998 #[test]
999 fn evaluate_row_int_compare() {
1000 let q = parse_select("SELECT age FROM s3object WHERE age > 100").unwrap();
1001 let headers = vec!["name".to_owned(), "age".to_owned()];
1002 let big = CsvRow {
1003 fields: vec!["x", "200"],
1004 headers: Some(&headers),
1005 };
1006 let small = CsvRow {
1007 fields: vec!["x", "50"],
1008 headers: Some(&headers),
1009 };
1010 assert!(evaluate_row(&q, &big).unwrap().is_some());
1011 assert!(evaluate_row(&q, &small).unwrap().is_none());
1012 }
1013
1014 #[test]
1015 fn evaluate_row_like_pattern() {
1016 let q = parse_select("SELECT name FROM s3object WHERE name LIKE 'foo%'").unwrap();
1017 let headers = vec!["name".to_owned()];
1018 let yes = CsvRow {
1019 fields: vec!["foobar"],
1020 headers: Some(&headers),
1021 };
1022 let no = CsvRow {
1023 fields: vec!["xfoobar"],
1024 headers: Some(&headers),
1025 };
1026 assert!(evaluate_row(&q, &yes).unwrap().is_some());
1027 assert!(evaluate_row(&q, &no).unwrap().is_none());
1028 }
1029
1030 #[test]
1031 fn run_select_csv_end_to_end_filters_rows() {
1032 let body = b"name,age\nalice,30\nbob,40\ncarol,50\n";
1033 let out = run_select_csv(
1034 "SELECT name FROM s3object WHERE age > 35",
1035 body,
1036 csv_input(),
1037 SelectOutputFormat::Csv,
1038 )
1039 .unwrap();
1040 let s = std::str::from_utf8(&out).unwrap();
1041 let lines: Vec<&str> = s.split("\r\n").filter(|l| !l.is_empty()).collect();
1042 assert_eq!(lines, vec!["bob", "carol"]);
1043 }
1044
1045 #[test]
1046 fn run_select_jsonlines_filter() {
1047 let body = b"{\"name\":\"alice\",\"age\":\"30\"}\n\
1048 {\"name\":\"bob\",\"age\":\"40\"}\n\
1049 {\"name\":\"carol\",\"age\":\"50\"}\n";
1050 let out = run_select_jsonlines(
1051 "SELECT name FROM s3object WHERE age > 35",
1052 body,
1053 SelectOutputFormat::Json,
1054 )
1055 .unwrap();
1056 let s = std::str::from_utf8(&out).unwrap();
1057 let lines: Vec<&str> = s.lines().filter(|l| !l.is_empty()).collect();
1058 assert_eq!(lines.len(), 2);
1059 assert!(lines[0].contains("bob"));
1060 assert!(lines[1].contains("carol"));
1061 }
1062
1063 #[test]
1064 fn positional_column_ref() {
1065 let body = b"alice,30\nbob,40\n";
1066 let out = run_select_csv(
1067 "SELECT _1 FROM s3object WHERE _2 > 35",
1068 body,
1069 SelectInputFormat::Csv {
1070 has_header: false,
1071 delimiter: ',',
1072 },
1073 SelectOutputFormat::Csv,
1074 )
1075 .unwrap();
1076 let s = std::str::from_utf8(&out).unwrap();
1077 let lines: Vec<&str> = s.split("\r\n").filter(|l| !l.is_empty()).collect();
1078 assert_eq!(lines, vec!["bob"]);
1079 }
1080
1081 #[test]
1082 fn and_or_combination() {
1083 let body = b"name,age,city\n\
1084 alice,30,nyc\n\
1085 bob,40,nyc\n\
1086 carol,50,sf\n\
1087 dan,25,sf\n";
1088 let out = run_select_csv(
1089 "SELECT name FROM s3object WHERE (city = 'nyc' AND age > 35) OR name = 'dan'",
1090 body,
1091 csv_input(),
1092 SelectOutputFormat::Csv,
1093 )
1094 .unwrap();
1095 let s = std::str::from_utf8(&out).unwrap();
1096 let mut lines: Vec<&str> = s.split("\r\n").filter(|l| !l.is_empty()).collect();
1097 lines.sort_unstable();
1098 assert_eq!(lines, vec!["bob", "dan"]);
1099 }
1100
1101 #[test]
1102 fn event_stream_records_frame_format() {
1103 let mut w = EventStreamWriter::new();
1104 let frame = w.records(b"hello,world\r\n");
1105 let total =
1106 u32::from_be_bytes([frame[0], frame[1], frame[2], frame[3]]) as usize;
1107 assert_eq!(total, frame.len());
1108 let headers_len =
1109 u32::from_be_bytes([frame[4], frame[5], frame[6], frame[7]]) as usize;
1110 let prelude_crc =
1111 u32::from_be_bytes([frame[8], frame[9], frame[10], frame[11]]);
1112 assert_eq!(prelude_crc, crc32fast::hash(&frame[..8]));
1113 let msg_crc = u32::from_be_bytes([
1114 frame[total - 4],
1115 frame[total - 3],
1116 frame[total - 2],
1117 frame[total - 1],
1118 ]);
1119 assert_eq!(msg_crc, crc32fast::hash(&frame[..total - 4]));
1120 let hdr_region = &frame[12..12 + headers_len];
1121 let s = String::from_utf8_lossy(hdr_region);
1122 assert!(s.contains(":event-type"));
1123 assert!(s.contains("Records"));
1124 let payload = &frame[12 + headers_len..total - 4];
1125 assert_eq!(payload, b"hello,world\r\n");
1126 }
1127
1128 #[test]
1129 fn event_stream_end_frame_no_payload() {
1130 let mut w = EventStreamWriter::new();
1131 let frame = w.end();
1132 let total =
1133 u32::from_be_bytes([frame[0], frame[1], frame[2], frame[3]]) as usize;
1134 let headers_len =
1135 u32::from_be_bytes([frame[4], frame[5], frame[6], frame[7]]) as usize;
1136 assert_eq!(total - 4 - 12 - headers_len, 0);
1137 let s = String::from_utf8_lossy(&frame[12..12 + headers_len]);
1138 assert!(s.contains("End"));
1139 }
1140
1141 #[test]
1142 fn event_stream_stats_xml_payload() {
1143 let mut w = EventStreamWriter::new();
1144 let frame = w.stats(1024, 800, 64);
1145 let total =
1146 u32::from_be_bytes([frame[0], frame[1], frame[2], frame[3]]) as usize;
1147 let headers_len =
1148 u32::from_be_bytes([frame[4], frame[5], frame[6], frame[7]]) as usize;
1149 let payload = &frame[12 + headers_len..total - 4];
1150 let xml = std::str::from_utf8(payload).unwrap();
1151 assert!(xml.contains("<BytesScanned>1024</BytesScanned>"));
1152 assert!(xml.contains("<BytesProcessed>800</BytesProcessed>"));
1153 assert!(xml.contains("<BytesReturned>64</BytesReturned>"));
1154 }
1155
1156 #[test]
1157 fn gpu_stub_returns_none() {
1158 let v = select_gpu(
1159 "SELECT * FROM s3object",
1160 b"name,age\nalice,30\n",
1161 &csv_input(),
1162 );
1163 assert!(v.is_none(), "GPU stub must always return None for v0.6");
1164 }
1165
1166 #[test]
1167 fn like_match_basics() {
1168 assert!(like_match("foobar", "foo%"));
1169 assert!(!like_match("xfoobar", "foo%"));
1170 assert!(like_match("abc", "_b_"));
1171 assert!(like_match("anything", "%"));
1172 assert!(like_match("", ""));
1173 assert!(!like_match("a", ""));
1174 }
1175}