use crate::lexer::{tokenize, Token};
use crate::state::QueryState;
#[derive(Debug, Clone)]
pub struct ParseError(pub String);
impl std::fmt::Display for ParseError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
impl std::error::Error for ParseError {}
impl From<crate::lexer::LexError> for ParseError {
fn from(e: crate::lexer::LexError) -> Self {
ParseError(e.0)
}
}
pub fn kql_to_sql(src: &str) -> Result<String, ParseError> {
let toks = tokenize(src)?;
let mut p = Parser::new(toks);
p.parse_query()?;
Ok(p.state.to_sql())
}
struct Parser {
toks: Vec<Token>,
pos: usize,
state: QueryState,
}
impl Parser {
fn new(toks: Vec<Token>) -> Self {
Self {
toks,
pos: 0,
state: QueryState::default(),
}
}
fn peek(&self) -> Option<&Token> {
self.toks.get(self.pos)
}
fn bump(&mut self) -> Result<Token, ParseError> {
let idx = self.pos;
let t = self
.toks
.get(idx)
.cloned()
.ok_or_else(|| ParseError("unexpected end of query".into()))?;
self.pos += 1;
Ok(t)
}
fn eat(&mut self, expected: &Token) -> bool {
if self.peek() == Some(expected) {
self.pos += 1;
true
} else {
false
}
}
fn expect(&mut self, expected: &Token, what: &str) -> Result<(), ParseError> {
if self.eat(expected) {
Ok(())
} else {
Err(ParseError(format!(
"expected {what}, got {:?}",
self.peek()
)))
}
}
fn expect_ident_eq(&mut self, lit: &str) -> Result<(), ParseError> {
match self.peek() {
Some(Token::Ident(s)) if s == lit => {
self.pos += 1;
Ok(())
}
other => Err(ParseError(format!("expected `{lit}`, got {other:?}"))),
}
}
fn expect_ident(&mut self) -> Result<String, ParseError> {
match self.bump()? {
Token::Ident(s) => Ok(s),
other => Err(ParseError(format!("expected identifier, got {other:?}"))),
}
}
fn eat_ident_eq(&mut self, kw: &str) -> bool {
match self.peek() {
Some(Token::Ident(s)) if s == kw => {
self.pos += 1;
true
}
_ => false,
}
}
fn parse_query(&mut self) -> Result<(), ParseError> {
let table = match self.bump()? {
Token::Ident(s) => s,
other => return Err(ParseError(format!("expected table name, got {other:?}"))),
};
self.state = QueryState::new(table);
while self.eat(&Token::Pipe) {
self.parse_operator()?;
}
if self.pos < self.toks.len() {
return Err(ParseError(format!(
"unexpected trailing tokens: {:?}",
&self.toks[self.pos..]
)));
}
Ok(())
}
fn parse_operator(&mut self) -> Result<(), ParseError> {
let op = match self.bump()? {
Token::Ident(s) => s,
other => return Err(ParseError(format!("expected operator, got {other:?}"))),
};
match op.as_str() {
"where" => self.op_where(),
"project" => self.op_project(false),
"project-away" => self.op_project(true),
"extend" => self.op_extend(),
"summarize" => self.op_summarize(),
"take" | "limit" => self.op_take(),
"sort" | "order" => self.op_sort(),
"top" => self.op_top(),
"count" => self.op_count(),
"distinct" => self.op_distinct(),
"graph-traverse" => self.op_graph_traverse(),
"graph-shortest-path" => self.op_graph_shortest_path(),
"make-graph" => self.op_make_graph(),
"graph-match" => self.op_graph_match(),
other => Err(ParseError(format!("unsupported operator: `{other}`"))),
}
}
fn op_where(&mut self) -> Result<(), ParseError> {
let e = self.parse_expr()?;
self.state.where_clauses.push(e);
Ok(())
}
fn op_project(&mut self, away: bool) -> Result<(), ParseError> {
let mut cols = Vec::new();
loop {
match self.bump()? {
Token::Ident(s) => cols.push(quote_ident(&s)),
other => {
return Err(ParseError(format!(
"expected column name after project, got {other:?}"
)))
}
}
if !self.eat(&Token::Comma) {
break;
}
}
if away {
self.state.exclude.extend(cols);
} else {
self.state.select = cols;
}
Ok(())
}
fn op_extend(&mut self) -> Result<(), ParseError> {
loop {
let name = match self.bump()? {
Token::Ident(s) => s,
other => {
return Err(ParseError(format!(
"expected name in extend, got {other:?}"
)))
}
};
self.expect(&Token::Assign, "`=`")?;
let expr = self.parse_expr()?;
self.state.extend.push((quote_ident(&name), expr));
if !self.eat(&Token::Comma) {
break;
}
}
Ok(())
}
fn op_summarize(&mut self) -> Result<(), ParseError> {
loop {
let alias_opt = self.try_alias()?;
let agg_expr = self.parse_expr()?;
let item = match alias_opt {
Some(a) => format!("({agg_expr}) AS {a}"),
None => format!("({agg_expr}) AS {}", implicit_agg_alias(&agg_expr)),
};
self.state.aggregates.push(item);
if !self.eat(&Token::Comma) {
break;
}
}
if let Some(Token::Ident(s)) = self.peek() {
if s == "by" {
self.pos += 1;
loop {
let key = self.parse_expr()?;
self.state.group_by.push(key);
if !self.eat(&Token::Comma) {
break;
}
}
}
}
Ok(())
}
fn op_take(&mut self) -> Result<(), ParseError> {
match self.bump()? {
Token::Int(n) if n >= 0 => {
self.state.limit = Some(n as u64);
Ok(())
}
other => Err(ParseError(format!("`take` expects integer, got {other:?}"))),
}
}
fn op_sort(&mut self) -> Result<(), ParseError> {
self.expect_ident_eq("by")?;
loop {
let col_expr = self.parse_expr()?;
let desc = if let Some(Token::Ident(s)) = self.peek() {
match s.as_str() {
"asc" => {
self.pos += 1;
false
}
"desc" => {
self.pos += 1;
true
}
_ => true, }
} else {
true
};
self.state.order_by.push((col_expr, desc));
if !self.eat(&Token::Comma) {
break;
}
}
Ok(())
}
fn op_top(&mut self) -> Result<(), ParseError> {
let n = match self.bump()? {
Token::Int(n) if n >= 0 => n as u64,
other => return Err(ParseError(format!("`top` expects integer, got {other:?}"))),
};
self.expect_ident_eq("by")?;
let col_expr = self.parse_expr()?;
let desc = if let Some(Token::Ident(s)) = self.peek() {
match s.as_str() {
"asc" => {
self.pos += 1;
false
}
"desc" => {
self.pos += 1;
true
}
_ => true,
}
} else {
true
};
self.state.order_by.push((col_expr, desc));
self.state.limit = Some(n);
Ok(())
}
fn op_count(&mut self) -> Result<(), ParseError> {
self.state
.aggregates
.push(r#"count(*) AS "Count""#.to_string());
Ok(())
}
fn op_distinct(&mut self) -> Result<(), ParseError> {
self.state.distinct = true;
let mut cols = Vec::new();
loop {
match self.peek() {
Some(Token::Ident(s)) => {
cols.push(quote_ident(s));
self.pos += 1;
}
_ => break,
}
if !self.eat(&Token::Comma) {
break;
}
}
if !cols.is_empty() {
self.state.select = cols;
}
Ok(())
}
fn parse_graph_preamble(
&mut self,
) -> Result<(String, String, u64, GraphDirection), ParseError> {
self.expect_ident_eq("from")?;
let src_col = match self.bump()? {
Token::Ident(s) => quote_ident(&s),
other => {
return Err(ParseError(format!(
"expected `from` column name, got {other:?}"
)))
}
};
self.expect_ident_eq("to")?;
let dst_col = match self.bump()? {
Token::Ident(s) => quote_ident(&s),
other => {
return Err(ParseError(format!(
"expected `to` column name, got {other:?}"
)))
}
};
self.expect_ident_eq("max-hops")?;
let max_hops = match self.bump()? {
Token::Int(n) if n >= 0 => n as u64,
other => {
return Err(ParseError(format!(
"expected positive integer after `max-hops`, got {other:?}"
)))
}
};
let direction = if matches!(self.peek(), Some(Token::Ident(s)) if s == "direction") {
self.pos += 1;
match self.bump()? {
Token::Ident(s) => match s.as_str() {
"forward" => GraphDirection::Forward,
"backward" => GraphDirection::Backward,
"both" => GraphDirection::Both,
other => return Err(ParseError(format!("unknown direction: {other}"))),
},
other => {
return Err(ParseError(format!(
"expected direction keyword, got {other:?}"
)))
}
}
} else {
GraphDirection::Forward
};
Ok((src_col, dst_col, max_hops, direction))
}
fn op_graph_traverse(&mut self) -> Result<(), ParseError> {
self.expect_ident_eq("source")?;
let seeds = self.parse_source_seeds()?;
let (src_col, dst_col, max_hops, direction) = self.parse_graph_preamble()?;
let edge_type: Option<String> =
if matches!(self.peek(), Some(Token::Ident(s)) if s == "edge-type") {
self.pos += 1; Some(self.parse_scalar_literal()?)
} else {
None
};
let type_filter = match &edge_type {
Some(v) => format!(r#" AND e."type" = {v}"#),
None => String::new(),
};
let edge_table = self.state.table.clone();
let step_sql = build_recursive_step_with_src(&edge_table, &src_col, &dst_col, direction);
let anchor = if seeds.len() == 1 {
let s = &seeds[0];
format!(
"SELECT CAST({s} AS VARCHAR) AS node, 0 AS depth, \
CAST({s} AS VARCHAR) AS path_src"
)
} else {
let values = seeds
.iter()
.map(|s| format!("(CAST({s} AS VARCHAR))"))
.collect::<Vec<_>>()
.join(", ");
format!(
"SELECT node, 0 AS depth, node AS path_src \
FROM (VALUES {values}) AS _seeds(node)"
)
};
let body = format!(
"{anchor} \
UNION ALL \
SELECT {step_sql} \
FROM _gt t \
JOIN {edge_table} e ON {join_cond} \
WHERE t.depth < {max_hops}{type_filter}",
join_cond = match direction {
GraphDirection::Forward => format!("e.{src_col} = t.node"),
GraphDirection::Backward => format!("e.{dst_col} = t.node"),
GraphDirection::Both => format!("(e.{src_col} = t.node OR e.{dst_col} = t.node)"),
},
);
self.state
.ctes
.push(("_gt(node, depth, path_src)".to_string(), body, true));
let min_body =
"SELECT node, path_src, MIN(depth) AS depth FROM _gt GROUP BY node, path_src";
self.state
.ctes
.push(("_gt_min".to_string(), min_body.to_string(), false));
let result_body = format!(
"SELECT e.*, m.depth \
FROM _gt_min m \
JOIN {edge_table} e ON e.{src_col} = m.path_src AND e.{dst_col} = m.node"
);
self.state
.ctes
.push(("_gt_result".to_string(), result_body, false));
self.state.table = "_gt_result".to_string();
Ok(())
}
fn op_graph_shortest_path(&mut self) -> Result<(), ParseError> {
self.expect_ident_eq("source")?;
let source_sql = self.parse_scalar_literal()?;
self.expect_ident_eq("target")?;
let target_sql = self.parse_scalar_literal()?;
let (src_col, dst_col, max_hops, direction) = self.parse_graph_preamble()?;
let edge_table = self.state.table.clone();
let body = format!(
"SELECT CAST({source_sql} AS VARCHAR) AS node, 0 AS depth \
UNION ALL \
SELECT {step_sql} \
FROM _sp t \
JOIN {edge_table} e ON {join_cond} \
WHERE t.depth < {max_hops}",
step_sql = build_recursive_step(&edge_table, &src_col, &dst_col, direction),
join_cond = match direction {
GraphDirection::Forward => format!("e.{src_col} = t.node"),
GraphDirection::Backward => format!("e.{dst_col} = t.node"),
GraphDirection::Both => format!("(e.{src_col} = t.node OR e.{dst_col} = t.node)"),
},
);
self.state
.ctes
.push(("_sp(node, depth)".to_string(), body, true));
let hits_body = format!("SELECT depth FROM _sp WHERE node = CAST({target_sql} AS VARCHAR)");
self.state
.ctes
.push(("_sp_hits".to_string(), hits_body, false));
let result_body =
"SELECT MIN(depth) AS depth, COUNT(*) > 0 AS found FROM _sp_hits".to_string();
self.state
.ctes
.push(("_sp_result".to_string(), result_body, false));
self.state.table = "_sp_result".to_string();
Ok(())
}
fn op_make_graph(&mut self) -> Result<(), ParseError> {
let src_col = self.expect_ident()?;
if !self.eat(&Token::Arrow) {
return Err(ParseError(
"make-graph: expected `-->` between src and dst".into(),
));
}
let dst_col = self.expect_ident()?;
self.expect_ident_eq("with")?;
let node_table = self.expect_ident()?;
self.expect_ident_eq("on")?;
let id_col = self.expect_ident()?;
self.state.graph_def = Some(crate::state::GraphDef {
edge_table: self.state.table.clone(),
src_col,
dst_col,
node_table,
id_col,
});
Ok(())
}
fn op_graph_match(&mut self) -> Result<(), ParseError> {
let def = self
.state
.graph_def
.clone()
.ok_or_else(|| ParseError("graph-match requires a preceding make-graph".into()))?;
self.expect(&Token::LParen, "(")?;
let a = self.expect_ident()?;
self.expect(&Token::RParen, ")")?;
self.expect(&Token::Minus, "-")?;
self.expect(&Token::LBracket, "[")?;
let e = self.expect_ident()?;
let edge_type = if self.eat(&Token::Colon) {
Some(self.expect_ident()?)
} else {
None
};
self.expect(&Token::RBracket, "]")?;
self.expect(&Token::RightArrow, "->")?;
self.expect(&Token::LParen, "(")?;
let b = self.expect_ident()?;
self.expect(&Token::RParen, ")")?;
self.expect_ident_eq("project")?;
let mut select_items: Vec<String> = Vec::new();
loop {
let var = self.expect_ident()?;
self.expect(&Token::Dot, ".")?;
let col = self.expect_ident()?;
let alias_tbl = if var == a {
"a"
} else if var == e {
"e"
} else if var == b {
"b"
} else {
return Err(ParseError(format!(
"graph-match project: unknown variable `{var}` (expected {a}, {e}, or {b})"
)));
};
let out_alias = if self.eat_ident_eq("as") {
self.expect_ident()?
} else {
format!("{var}_{col}")
};
select_items.push(format!(
"{alias_tbl}.{} AS {}",
quote_ident(&col),
quote_ident(&out_alias)
));
if !self.eat(&Token::Comma) {
break;
}
}
if select_items.is_empty() {
return Err(ParseError(
"graph-match: project must list at least one column".into(),
));
}
let where_clause = match &edge_type {
Some(t) => format!(r#" WHERE e."type" = '{}'"#, t.replace('\'', "''")),
None => String::new(),
};
let body = format!(
"SELECT {sel} FROM {edges} e \
JOIN {nodes} a ON e.{src} = a.{id} \
JOIN {nodes} b ON e.{dst} = b.{id}{where_clause}",
sel = select_items.join(", "),
edges = def.edge_table,
nodes = def.node_table,
src = quote_ident(&def.src_col),
dst = quote_ident(&def.dst_col),
id = quote_ident(&def.id_col),
);
self.state
.ctes
.push(("_gm_result".to_string(), body, false));
self.state.table = "_gm_result".to_string();
Ok(())
}
fn parse_source_seeds(&mut self) -> Result<Vec<String>, ParseError> {
if self.eat(&Token::LParen) {
let mut seeds = Vec::new();
loop {
seeds.push(self.parse_scalar_literal()?);
if self.eat(&Token::Comma) {
continue;
}
break;
}
if !self.eat(&Token::RParen) {
return Err(ParseError("expected ')' to close source list".into()));
}
if seeds.is_empty() {
return Err(ParseError("source list must have at least one value".into()));
}
Ok(seeds)
} else {
Ok(vec![self.parse_scalar_literal()?])
}
}
fn parse_scalar_literal(&mut self) -> Result<String, ParseError> {
match self.bump()? {
Token::Str(s) => Ok(format!("'{}'", s.replace('\'', "''"))),
Token::Int(n) => Ok(n.to_string()),
Token::Float(f) => Ok(f.to_string()),
other => Err(ParseError(format!("expected literal, got {other:?}"))),
}
}
fn parse_expr(&mut self) -> Result<String, ParseError> {
self.parse_or()
}
fn parse_or(&mut self) -> Result<String, ParseError> {
let mut lhs = self.parse_and()?;
while matches!(self.peek(), Some(Token::Ident(s)) if s == "or") {
self.pos += 1;
let rhs = self.parse_and()?;
lhs = format!("({lhs} OR {rhs})");
}
Ok(lhs)
}
fn parse_and(&mut self) -> Result<String, ParseError> {
let mut lhs = self.parse_not()?;
while matches!(self.peek(), Some(Token::Ident(s)) if s == "and") {
self.pos += 1;
let rhs = self.parse_not()?;
lhs = format!("({lhs} AND {rhs})");
}
Ok(lhs)
}
fn parse_not(&mut self) -> Result<String, ParseError> {
if matches!(self.peek(), Some(Token::Ident(s)) if s == "not") {
self.pos += 1;
let inner = self.parse_not()?;
return Ok(format!("(NOT {inner})"));
}
self.parse_comparison()
}
fn parse_comparison(&mut self) -> Result<String, ParseError> {
let lhs = self.parse_additive()?;
if matches!(self.peek(), Some(Token::Ident(s)) if s == "between") {
self.pos += 1;
self.expect(&Token::LParen, "`(`")?;
let low = self.parse_additive()?;
self.expect(&Token::DotDot, "`..`")?;
let high = self.parse_additive()?;
self.expect(&Token::RParen, "`)`")?;
return Ok(format!("(({lhs} >= {low}) AND ({lhs} <= {high}))"));
}
if matches!(self.peek(), Some(Token::Ident(s)) if s.eq_ignore_ascii_case("in")) {
self.pos += 1;
self.expect(&Token::LParen, "`(` after `in`")?;
let mut values: Vec<String> = Vec::new();
loop {
values.push(self.parse_additive()?);
if !self.eat(&Token::Comma) {
break;
}
}
self.expect(&Token::RParen, "`)`")?;
return Ok(format!("({lhs} IN ({}))", values.join(", ")));
}
let op = match self.peek() {
Some(Token::Eq) => Some("="),
Some(Token::Ne) => Some("<>"),
Some(Token::Lt) => Some("<"),
Some(Token::Le) => Some("<="),
Some(Token::Gt) => Some(">"),
Some(Token::Ge) => Some(">="),
Some(Token::Ident(s)) => match s.as_str() {
"contains" => Some("KQL_CONTAINS"),
"startswith" => Some("KQL_STARTSWITH"),
"endswith" => Some("KQL_ENDSWITH"),
"has" => Some("KQL_HAS"),
_ => None,
},
_ => None,
};
let Some(op) = op else {
return Ok(lhs);
};
self.pos += 1;
let rhs = self.parse_additive()?;
let result = match op {
"KQL_CONTAINS" => format!("({lhs} LIKE {})", add_like_both(&rhs)),
"KQL_STARTSWITH" => format!("({lhs} LIKE {})", add_like_right(&rhs)),
"KQL_ENDSWITH" => format!("({lhs} LIKE {})", add_like_left(&rhs)),
"KQL_HAS" => format!("({lhs} LIKE {})", add_like_both(&rhs)),
sql_op => format!("({lhs} {sql_op} {rhs})"),
};
Ok(result)
}
fn parse_additive(&mut self) -> Result<String, ParseError> {
let mut lhs = self.parse_multiplicative()?;
loop {
let op = match self.peek() {
Some(Token::Plus) => "+",
Some(Token::Minus) => "-",
_ => break,
};
self.pos += 1;
let rhs = self.parse_multiplicative()?;
lhs = format!("({lhs} {op} {rhs})");
}
Ok(lhs)
}
fn parse_multiplicative(&mut self) -> Result<String, ParseError> {
let mut lhs = self.parse_unary()?;
loop {
let op = match self.peek() {
Some(Token::Star) => "*",
Some(Token::Slash) => "/",
Some(Token::Percent) => "%",
_ => break,
};
self.pos += 1;
let rhs = self.parse_unary()?;
lhs = format!("({lhs} {op} {rhs})");
}
Ok(lhs)
}
fn parse_unary(&mut self) -> Result<String, ParseError> {
if self.eat(&Token::Minus) {
let inner = self.parse_unary()?;
return Ok(format!("(-{inner})"));
}
self.parse_atom()
}
fn parse_atom(&mut self) -> Result<String, ParseError> {
let t = self.bump()?;
match t {
Token::Int(n) => Ok(n.to_string()),
Token::Float(f) => Ok(f.to_string()),
Token::Str(s) => Ok(format!("'{}'", s.replace('\'', "''"))),
Token::Duration(d) => Ok(duration_to_sql_interval(&d)?),
Token::LParen => {
let inner = self.parse_expr()?;
self.expect(&Token::RParen, "`)`")?;
Ok(format!("({inner})"))
}
Token::Star => Ok("*".to_string()),
Token::Ident(name) => {
if self.eat(&Token::LParen) {
self.parse_func_call(name)
} else if matches!(self.peek(), Some(Token::Dot)) {
Err(ParseError(format!(
"dotted path `{name}...` not yet supported (dynamic columns land with format-v1)"
)))
} else {
match name.as_str() {
"true" => Ok("TRUE".into()),
"false" => Ok("FALSE".into()),
"null" => Ok("NULL".into()),
_ => Ok(quote_ident(&name)),
}
}
}
other => Err(ParseError(format!(
"unexpected token in expression: {other:?}"
))),
}
}
fn parse_func_call(&mut self, name: String) -> Result<String, ParseError> {
let mut args = Vec::new();
if !self.eat(&Token::RParen) {
loop {
args.push(self.parse_expr()?);
if !self.eat(&Token::Comma) {
break;
}
}
self.expect(&Token::RParen, "`)`")?;
}
match (name.as_str(), args.as_slice()) {
("now", []) => Ok("now()".into()),
("ago", [d]) => Ok(format!("(now() - {d})")),
("bin", [col, bucket]) => Ok(format!(
"date_bin({bucket}, {col}, TIMESTAMP '1970-01-01 00:00:00')"
)),
("startofhour", [col]) => Ok(format!("date_trunc('hour', {col})")),
("startofday", [col]) => Ok(format!("date_trunc('day', {col})")),
("startofmonth", [col]) => Ok(format!("date_trunc('month', {col})")),
("datetime", [x]) => Ok(format!("CAST({x} AS TIMESTAMP)")),
("strcat", _) => Ok(format!("concat({})", args.join(", "))),
("tolower", [s]) => Ok(format!("lower({s})")),
("toupper", [s]) => Ok(format!("upper({s})")),
("strlen", [s]) => Ok(format!("char_length({s})")),
("isnull", [x]) => Ok(format!("({x} IS NULL)")),
("isnotnull", [x]) => Ok(format!("({x} IS NOT NULL)")),
("iff", [c, a, b]) => Ok(format!("(CASE WHEN {c} THEN {a} ELSE {b} END)")),
("count", []) => Ok("count(*)".into()),
("count", [x]) => Ok(format!("count({x})")),
("sum", [x]) => Ok(format!("sum({x})")),
("avg", [x]) => Ok(format!("avg({x})")),
("min", [x]) => Ok(format!("min({x})")),
("max", [x]) => Ok(format!("max({x})")),
("dcount", [x]) => Ok(format!("count(DISTINCT {x})")),
("dcountif", [x, c]) => Ok(format!(
"count(DISTINCT CASE WHEN {c} THEN {x} ELSE NULL END)"
)),
(other, _) => Err(ParseError(format!(
"unsupported function: {other}/{}",
args.len()
))),
}
}
fn try_alias(&mut self) -> Result<Option<String>, ParseError> {
if let (Some(Token::Ident(a)), Some(Token::Assign)) =
(self.toks.get(self.pos), self.toks.get(self.pos + 1))
{
let alias = a.clone();
self.pos += 2;
return Ok(Some(quote_ident(&alias)));
}
Ok(None)
}
}
#[derive(Debug, Clone, Copy)]
enum GraphDirection {
Forward,
Backward,
Both,
}
fn build_recursive_step(_table: &str, src_col: &str, dst_col: &str, dir: GraphDirection) -> String {
match dir {
GraphDirection::Forward => format!("e.{dst_col}, t.depth + 1"),
GraphDirection::Backward => format!("e.{src_col}, t.depth + 1"),
GraphDirection::Both => format!(
"CASE WHEN e.{src_col} = t.node THEN e.{dst_col} ELSE e.{src_col} END, t.depth + 1"
),
}
}
fn build_recursive_step_with_src(
_table: &str,
src_col: &str,
dst_col: &str,
dir: GraphDirection,
) -> String {
match dir {
GraphDirection::Forward => format!("e.{dst_col}, t.depth + 1, t.node"),
GraphDirection::Backward => format!("e.{src_col}, t.depth + 1, t.node"),
GraphDirection::Both => format!(
"CASE WHEN e.{src_col} = t.node THEN e.{dst_col} ELSE e.{src_col} END, t.depth + 1, t.node"
),
}
}
fn quote_ident(name: &str) -> String {
if name.chars().all(|c| c.is_ascii_alphanumeric() || c == '_')
&& name
.chars()
.next()
.map(|c| !c.is_ascii_digit())
.unwrap_or(false)
{
name.to_string()
} else {
format!("\"{}\"", name.replace('"', "\"\""))
}
}
fn duration_to_sql_interval(d: &str) -> Result<String, ParseError> {
let (n, unit) = d.split_at(d.len() - 1);
let n: i64 = n
.parse()
.map_err(|_| ParseError(format!("bad duration literal: {d}")))?;
let (secs, unit_name) = match unit {
"s" => (n, "second"),
"m" => (n * 60, "minute"),
"h" => (n * 3600, "hour"),
"d" => (n * 86400, "day"),
other => return Err(ParseError(format!("unknown duration unit: {other}"))),
};
let _ = secs;
Ok(format!("INTERVAL '{n} {unit_name}'"))
}
fn add_like_right(sql_string_literal: &str) -> String {
let unquoted = strip_outer_quotes(sql_string_literal);
format!("'{}%'", unquoted.replace('\'', "''"))
}
fn add_like_left(sql_string_literal: &str) -> String {
let unquoted = strip_outer_quotes(sql_string_literal);
format!("'%{}'", unquoted.replace('\'', "''"))
}
fn add_like_both(sql_string_literal: &str) -> String {
let unquoted = strip_outer_quotes(sql_string_literal);
format!("'%{}%'", unquoted.replace('\'', "''"))
}
fn strip_outer_quotes(s: &str) -> String {
if s.starts_with('\'') && s.ends_with('\'') && s.len() >= 2 {
s[1..s.len() - 1].replace("''", "'")
} else {
s.to_string()
}
}
fn implicit_agg_alias(expr: &str) -> String {
let id: String = expr
.chars()
.map(|c| if c.is_ascii_alphanumeric() { c } else { '_' })
.collect();
let id = id.trim_matches('_').to_string();
if id.is_empty() {
"expr".into()
} else {
id.chars().take(40).collect()
}
}
#[cfg(test)]
mod tests {
use super::*;
fn must(src: &str) -> String {
kql_to_sql(src).expect("parse")
}
#[test]
fn simple_scan() {
assert_eq!(must("t"), "SELECT * FROM t");
}
#[test]
fn where_and_project() {
let sql = must("t | where status >= 500 | project timestamp, status");
assert!(sql.contains("SELECT timestamp, status FROM t"));
assert!(sql.contains("(status >= 500)"));
}
#[test]
fn take_n() {
assert_eq!(must("t | take 5"), "SELECT * FROM t LIMIT 5");
}
#[test]
fn count_aggregate() {
let sql = must("t | count");
assert!(
sql.starts_with("SELECT count(*) AS \"Count\" FROM t"),
"got: {sql}"
);
}
#[test]
fn summarize_by() {
let sql = must("t | summarize count() by status");
assert!(sql.contains("GROUP BY status"));
}
#[test]
fn sort_desc() {
let sql = must("t | sort by timestamp desc");
assert!(sql.contains("ORDER BY timestamp DESC"));
}
#[test]
fn extend_arith() {
let sql = must("t | extend class = status / 100");
assert!(sql.contains("AS class"));
}
#[test]
fn ago_literal() {
let sql = must("t | where timestamp > ago(1h)");
assert!(sql.contains("now() - INTERVAL '1 hour'"));
}
#[test]
fn contains_op() {
let sql = must(r#"t | where body contains "oom""#);
assert!(sql.contains("position") || sql.contains("LIKE"));
}
#[test]
fn startswith_op() {
let sql = must(r#"t | where path startswith "/api""#);
assert!(sql.contains("LIKE '/api%'"));
}
#[test]
fn top_operator() {
let sql = must("t | top 5 by latency desc");
assert!(sql.contains("ORDER BY latency DESC"));
assert!(sql.contains("LIMIT 5"));
}
#[test]
fn between_with_ago_and_now() {
let sql = must("t | where timestamp between (ago(1h) .. now())");
assert!(sql.contains(">= (now() - INTERVAL '1 hour')"));
assert!(sql.contains("<= now()"));
assert!(sql.contains("AND"));
}
#[test]
fn between_with_datetime_literals() {
let sql =
must("t | where timestamp between (datetime(2026-01-01) .. datetime(2026-02-01))");
assert!(sql.contains("CAST("));
assert!(sql.contains("AND"));
}
#[test]
fn between_combined_with_and() {
let sql = must(r#"t | where n between (1 .. 10) and label == "x""#);
assert!(sql.contains("(n >= 1)"));
assert!(sql.contains("(n <= 10)"));
assert!(sql.contains("(label = 'x')"));
assert!(sql.contains("AND"));
}
#[test]
fn between_lowering_shape() {
let sql = must("t | where n between (1 .. 10)");
assert!(
sql.contains("((n >= 1) AND (n <= 10))"),
"unexpected shape: {sql}"
);
}
#[test]
fn parses_in_with_string_list() {
let kql = r#"context_nodes | where label in ("a", "b", "c") | take 10"#;
let sql = kql_to_sql(kql).expect("parse");
assert!(sql.contains("IN"), "expected SQL to contain 'IN', got: {sql}");
assert!(sql.contains("'a'"), "expected 'a' in SQL, got: {sql}");
assert!(sql.contains("'c'"), "expected 'c' in SQL, got: {sql}");
}
#[test]
fn parses_in_with_int_list() {
let kql = "logs | where status in (200, 201, 204)";
let sql = kql_to_sql(kql).expect("parse");
assert!(sql.contains("IN"), "expected SQL IN, got: {sql}");
assert!(sql.contains("200"), "got: {sql}");
assert!(sql.contains("204"), "got: {sql}");
}
#[test]
fn graph_traverse_multi_source_uses_values_anchor() {
let sql = kql_to_sql(r#"e | graph-traverse source ("a","b") from src to dst max-hops 2"#).expect("parse");
assert!(sql.to_uppercase().contains("VALUES"), "expected VALUES anchor, got: {sql}");
assert!(sql.contains("'a'") && sql.contains("'b'"), "expected both seeds, got: {sql}");
assert!(sql.contains("path_src"), "still a traverse, got: {sql}");
}
#[test]
fn graph_traverse_single_source_unchanged() {
let sql = kql_to_sql(r#"e | graph-traverse source "a" from src to dst max-hops 2"#).expect("parse");
assert!(sql.contains("CAST('a' AS VARCHAR) AS node") || sql.contains("CAST('a' AS VARCHAR) AS NODE")
|| sql.contains("CAST('a' AS VARCHAR)"), "expected single-source CAST anchor, got: {sql}");
}
#[test]
fn graph_traverse_edge_type_filters_per_hop() {
let sql = kql_to_sql(r#"e | graph-traverse source "a" from src to dst max-hops 3 edge-type "CALLS""#).expect("parse");
assert!(sql.contains(r#"e."type" = 'CALLS'"#), "expected per-hop edge-type filter, got: {sql}");
}
#[test]
fn graph_traverse_no_edge_type_has_no_type_filter() {
let sql = kql_to_sql(r#"e | graph-traverse source "a" from src to dst max-hops 3"#).expect("parse");
assert!(!sql.contains(r#""type" ="#), "no edge-type clause expected, got: {sql}");
}
#[test]
fn graph_traverse_multi_source_and_edge_type_compose() {
let sql = kql_to_sql(r#"e | graph-traverse source ("a","b") from src to dst max-hops 2 edge-type "X""#).expect("parse");
assert!(sql.to_uppercase().contains("VALUES") && sql.contains(r#"e."type" = 'X'"#), "got: {sql}");
}
#[test]
fn make_graph_then_match_projects_join() {
let sql = kql_to_sql(
r#"edges | make-graph caller --> callee with services on id | graph-match (a)-[e:CALLS]->(b) project a.name, e.latency, b.name"#
).expect("parse");
let u = sql.to_uppercase();
assert!(
u.contains("JOIN SERVICES A ON"),
"expected node join for a, got: {sql}"
);
assert!(
u.contains("JOIN SERVICES B ON"),
"expected node join for b, got: {sql}"
);
assert!(
sql.contains("e.caller = a.id"),
"a joins on src=id, got: {sql}"
);
assert!(
sql.contains("e.callee = b.id"),
"b joins on dst=id, got: {sql}"
);
assert!(
sql.contains(r#"e."type" = 'CALLS'"#),
"edge-type filter, got: {sql}"
);
assert!(
sql.contains("a.name AS a_name"),
"expected a.name AS a_name, got: {sql}"
);
assert!(
sql.contains("e.latency AS e_latency"),
"expected e.latency AS e_latency, got: {sql}"
);
assert!(
sql.contains("b.name AS b_name"),
"expected b.name AS b_name, got: {sql}"
);
}
#[test]
fn graph_match_without_make_graph_errors() {
let err = kql_to_sql(r#"edges | graph-match (a)-[e]->(b) project a.x"#);
assert!(
err.is_err(),
"graph-match requires a preceding make-graph"
);
}
#[test]
fn graph_traverse_projects_full_edge_columns() {
let kql = r#"context_edges | graph-traverse source "a" from src to dst max-hops 2"#;
let sql = kql_to_sql(kql).expect("parse");
assert!(
sql.contains("e.*"),
"expected edge-row projection (e.*), got: {sql}"
);
assert!(sql.contains("depth"), "expected depth column, got: {sql}");
assert!(sql.contains("dst"), "expected dst column, got: {sql}");
assert!(
sql.contains("path_src"),
"expected path_src in CTE, got: {sql}"
);
}
}