1mod expr;
6mod logical;
7
8use crate::dataframe::{DataFrame, JoinType, disambiguate_agg_output_names};
9use crate::functions::{
10 SortOrder, asc_nulls_first, asc_nulls_last, col, desc_nulls_first, desc_nulls_last,
11};
12use crate::plan::expr::{expr_from_value, try_column_from_udf_value};
13use crate::session::{SparkSession, set_thread_udf_session};
14pub use expr::PlanExprError;
15pub use logical::LogicalPlan;
16use polars::prelude::PolarsError;
17use robin_sparkless_core::engine::{DataFrameBackend, PlanExecutor as CorePlanExecutor};
18use robin_sparkless_core::error::EngineError;
19use serde_json::Value;
20
21pub fn execute_plan(
29 session: &SparkSession,
30 data: Vec<Vec<Value>>,
31 schema: Vec<(String, String)>,
32 plan: &[Value],
33) -> Result<DataFrame, PlanError> {
34 set_thread_udf_session(session.clone());
35 let mut df = session
36 .create_dataframe_from_rows(data, schema, false, false)
37 .map_err(PlanError::Session)?
38 .with_case_insensitive_column_resolution();
39
40 for op_value in plan {
41 let op_obj = op_value
42 .as_object()
43 .ok_or_else(|| PlanError::InvalidPlan("each plan step must be a JSON object".into()))?;
44 let op_name = op_obj
45 .get("op")
46 .and_then(Value::as_str)
47 .ok_or_else(|| PlanError::InvalidPlan("each plan step must have 'op' string".into()))?;
48 let mut payload = op_obj.get("payload").cloned().unwrap_or(Value::Null);
49 if matches!(op_name, "join" | "union" | "unionByName" | "crossJoin") {
52 payload = merge_other_into_payload(payload, op_obj);
53 }
54
55 df = apply_op(session, df, op_name, payload)?;
56 }
57
58 Ok(df)
59}
60
61pub struct PolarsPlanExecutor;
67
68impl CorePlanExecutor<SparkSession> for PolarsPlanExecutor {
69 fn execute_plan(
70 session: &SparkSession,
71 data: Vec<Vec<Value>>,
72 schema: Vec<(String, String)>,
73 plan: &[Value],
74 ) -> Result<Box<dyn DataFrameBackend>, EngineError> {
75 let df = execute_plan(session, data, schema, plan)
77 .map_err(|e| EngineError::Internal(e.to_string()))?;
78 Ok(Box::new(df))
79 }
80}
81
82#[derive(Debug)]
84pub enum PlanError {
85 Session(PolarsError),
86 Expr(PlanExprError),
87 InvalidPlan(String),
88 UnsupportedOp(String),
89}
90
91impl std::fmt::Display for PlanError {
92 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
93 match self {
94 PlanError::Session(e) => write!(f, "session/df: {e}"),
95 PlanError::Expr(e) => write!(f, "expression: {e}"),
96 PlanError::InvalidPlan(s) => write!(f, "invalid plan: {s}"),
97 PlanError::UnsupportedOp(s) => write!(f, "unsupported op: {s}"),
98 }
99 }
100}
101
102impl std::error::Error for PlanError {}
103
104fn merge_other_into_payload(payload: Value, op: &serde_json::Map<String, Value>) -> Value {
106 fn get(obj: &serde_json::Map<String, Value>, snake: &str, camel: &str) -> Option<Value> {
107 obj.get(snake).or_else(|| obj.get(camel)).cloned()
108 }
109 let mut p = match payload {
110 Value::Object(m) => m,
111 _ => return payload,
112 };
113 if p.get("other_data").or_else(|| p.get("otherData")).is_none() {
114 if let Some(v) = get(op, "other_data", "otherData") {
115 p.insert("other_data".into(), v);
116 }
117 }
118 if p.get("other_schema")
119 .or_else(|| p.get("otherSchema"))
120 .is_none()
121 {
122 if let Some(v) = get(op, "other_schema", "otherSchema") {
123 p.insert("other_schema".into(), v);
124 }
125 }
126 if p.get("on").is_none() {
127 if let Some(v) = get(op, "on", "on") {
128 p.insert("on".into(), v);
129 }
130 }
131 Value::Object(p)
132}
133
134fn get_other_data(payload: &Value) -> Option<&Vec<Value>> {
136 payload
137 .get("other_data")
138 .or_else(|| payload.get("otherData"))
139 .and_then(Value::as_array)
140}
141
142fn get_other_schema(payload: &Value) -> Option<&Vec<Value>> {
144 payload
145 .get("other_schema")
146 .or_else(|| payload.get("otherSchema"))
147 .and_then(Value::as_array)
148}
149
150fn parse_order_by_element(v: &Value) -> Option<Vec<(String, bool)>> {
153 if let Some(s) = v.as_str() {
154 let s = s.trim();
155 if s.eq_ignore_ascii_case("desc") || s.eq_ignore_ascii_case("asc") {
156 return None;
157 }
158 if s.to_uppercase().ends_with(" DESC") {
159 let name = s[..s.len().saturating_sub(5)].trim().to_string();
160 return if name.is_empty() {
161 None
162 } else {
163 Some(vec![(name, false)])
164 };
165 }
166 if s.to_uppercase().ends_with(" ASC") {
167 let name = s[..s.len().saturating_sub(4)].trim().to_string();
168 return if name.is_empty() {
169 None
170 } else {
171 Some(vec![(name, true)])
172 };
173 }
174 if s.starts_with('[') && s.ends_with(']') {
175 let inner = s[1..s.len() - 1].trim();
176 if inner.is_empty() {
177 return Some(vec![]);
178 }
179 let names: Vec<(String, bool)> = inner
180 .split(',')
181 .map(|p| {
182 (
183 p.trim().trim_matches('\'').trim_matches('"').to_string(),
184 true,
185 )
186 })
187 .filter(|(n, _)| !n.is_empty())
188 .collect();
189 return Some(names);
190 }
191 return Some(vec![(s.to_string(), true)]);
192 }
193 let obj = v.as_object()?;
194 let name = obj
195 .get("col")
196 .or_else(|| obj.get("name"))
197 .and_then(Value::as_str)
198 .map(|s| s.to_string())?;
199 Some(vec![(name, true)])
200}
201
202fn expr_to_col_name(v: &Value) -> Option<String> {
204 let obj = v.as_object()?;
205 obj.get("col")
206 .or_else(|| obj.get("column"))
207 .and_then(Value::as_str)
208 .map(|s| s.to_string())
209}
210
211fn parse_join_on(on: &Value, df: &DataFrame) -> Result<Vec<String>, PlanError> {
216 if let Some(s) = on.as_str() {
217 if s.contains('(') {
218 return Err(PlanError::InvalidPlan(
219 "join on expression (e.g. array_contains(...) or column expr) is not supported; use column names only".into(),
220 ));
221 }
222 let resolved = df.resolve_column_name(s).map_err(PlanError::Session)?;
223 return Ok(vec![resolved]);
224 }
225 let arr = on.as_array().ok_or_else(|| {
226 PlanError::InvalidPlan(
227 "join 'on' must be string, array of strings, or array of column refs / eq expressions"
228 .into(),
229 )
230 })?;
231 let mut keys = Vec::with_capacity(arr.len());
232 for v in arr {
233 if let Some(s) = v.as_str() {
234 let resolved = df.resolve_column_name(s).map_err(PlanError::Session)?;
235 keys.push(resolved);
236 continue;
237 }
238 if let Some(obj) = v.as_object() {
239 if let Some(name) = expr_to_col_name(v) {
241 let resolved = df.resolve_column_name(&name).map_err(PlanError::Session)?;
242 keys.push(resolved);
243 continue;
244 }
245 let op = obj
247 .get("op")
248 .or_else(|| obj.get("operator"))
249 .and_then(Value::as_str);
250 if op.map(|o| o == "eq" || o == "==").unwrap_or(false) {
251 let left = obj.get("left").and_then(expr_to_col_name);
252 let right = obj.get("right").and_then(expr_to_col_name);
253 if let (Some(l), Some(r)) = (left, right) {
254 if l == r {
255 let resolved = df.resolve_column_name(&l).map_err(PlanError::Session)?;
256 keys.push(resolved);
257 continue;
258 }
259 }
260 }
261 }
262 return Err(PlanError::InvalidPlan(
263 "join 'on' element must be string, {\"col\": \"name\"}, or {\"op\": \"eq\", \"left\": {\"col\": \"x\"}, \"right\": {\"col\": \"x\"}}".into(),
264 ));
265 }
266 Ok(keys)
267}
268
269fn other_data_to_rows(other_data: &[Value], schema_names: &[String]) -> Vec<Vec<Value>> {
271 other_data
272 .iter()
273 .filter_map(|v| {
274 if let Some(arr) = v.as_array() {
275 return Some(arr.clone());
276 }
277 if let Some(obj) = v.as_object() {
278 let row: Vec<Value> = schema_names
279 .iter()
280 .map(|n| obj.get(n).cloned().unwrap_or(Value::Null))
281 .collect();
282 return Some(row);
283 }
284 None
285 })
286 .collect()
287}
288
289fn schema_field_to_pair(v: &Value) -> Option<(String, String)> {
291 let obj = v.as_object()?;
292 let name = obj
293 .get("name")
294 .or_else(|| obj.get("fieldName"))
295 .and_then(Value::as_str)?
296 .to_string();
297 let ty = obj
298 .get("type")
299 .or_else(|| obj.get("dataType"))
300 .and_then(Value::as_str)
301 .or_else(|| {
302 obj.get("dataType")?.get("typeName").and_then(Value::as_str)
304 })?
305 .to_string();
306 Some((name, ty))
307}
308
309fn apply_op(
310 session: &SparkSession,
311 df: DataFrame,
312 op_name: &str,
313 payload: Value,
314) -> Result<DataFrame, PlanError> {
315 match op_name {
316 "stop" => {
317 let _ = payload;
318 session.stop();
319 Ok(df)
320 }
321 "filter" => {
322 let expr = expr_from_value(&payload).map_err(PlanError::Expr)?;
323 df.filter(expr).map_err(PlanError::Session)
324 }
325 "select" => {
326 let arr = payload
329 .as_array()
330 .or_else(|| payload.get("columns").and_then(Value::as_array));
331 if let Some(arr) = arr {
332 if arr.is_empty() {
333 return Err(PlanError::InvalidPlan(
334 "select payload must be non-empty array".into(),
335 ));
336 }
337 let mut exprs = Vec::with_capacity(arr.len());
338 for (idx, v) in arr.iter().enumerate() {
339 if let Some(obj) = v.as_object() {
340 if let Some(expr_val) = obj.get("expr") {
341 let name = obj
343 .get("name")
344 .and_then(Value::as_str)
345 .unwrap_or("_c"); let expr = expr_from_value(expr_val).map_err(PlanError::Expr)?;
347 let resolved = df
348 .resolve_expr_column_names(expr)
349 .map_err(PlanError::Session)?;
350 exprs.push(resolved.alias(name));
351 continue;
352 }
353 if obj.contains_key("fn") || obj.contains_key("op") {
355 if let Ok(expr) = expr_from_value(v) {
356 let resolved = df
357 .resolve_expr_column_names(expr)
358 .map_err(PlanError::Session)?;
359 let alias = obj
360 .get("fn")
361 .and_then(Value::as_str)
362 .filter(|s| *s == "alias")
363 .and_then(|_| obj.get("args").and_then(Value::as_array))
364 .and_then(|a| a.last())
365 .and_then(Value::as_str)
366 .map(String::from)
367 .unwrap_or_else(|| format!("_c{idx}"));
368 exprs.push(resolved.alias(alias));
369 continue;
370 }
371 }
372 }
373 let name_str: String = if let Some(s) = v.as_str() {
375 s.to_string()
376 } else if let Some(obj) = v.as_object() {
377 expr_to_col_name(v)
378 .or_else(|| obj.get("name").and_then(Value::as_str).map(String::from))
379 .ok_or_else(|| {
380 PlanError::InvalidPlan(
381 "select item must be string, {col/column/name}, or {name, expr}".into(),
382 )
383 })?
384 } else {
385 return Err(PlanError::InvalidPlan(
386 "select payload must be list of column name strings or {name, expr} or {col/column/name} objects".into(),
387 ));
388 };
389 let s = name_str.trim();
391 let (expr_str, alias_override) = if let Some(ix) = s.rfind(" as ") {
392 (s[..ix].trim(), Some(s[ix + 4..].trim())) } else {
394 (s, None)
395 };
396 if let Some(expr) =
397 crate::plan::expr::try_parse_concat_expr_from_string(expr_str)
398 {
399 let resolved = df
400 .resolve_expr_column_names(expr)
401 .map_err(PlanError::Session)?;
402 let alias = alias_override.unwrap_or(s);
403 exprs.push(resolved.alias(alias));
404 } else {
405 let col_expr = polars::prelude::col::<&str>(expr_str);
407 let resolved = df
408 .resolve_expr_column_names(col_expr)
409 .map_err(PlanError::Session)?;
410 exprs.push(resolved.alias(name_str.as_str()));
412 }
413 }
414 df.select_exprs(exprs).map_err(PlanError::Session)
415 } else {
416 Err(PlanError::InvalidPlan(
417 "select payload must be array of column names or {name, expr} objects, or object with 'columns' array".into(),
418 ))
419 }
420 }
421 "limit" => {
422 let n = payload.get("n").and_then(Value::as_u64).ok_or_else(|| {
423 PlanError::InvalidPlan("limit payload must have 'n' number".into())
424 })?;
425 df.limit(n as usize).map_err(PlanError::Session)
426 }
427 "offset" => {
428 let n = payload.get("n").and_then(Value::as_u64).unwrap_or(0);
429 df.offset(n as usize).map_err(PlanError::Session)
430 }
431 "orderBy" => {
432 let columns = payload
433 .get("columns")
434 .and_then(Value::as_array)
435 .ok_or_else(|| {
436 PlanError::InvalidPlan("orderBy payload must have 'columns' array".into())
437 })?;
438 let mut pairs: Vec<(String, bool)> = Vec::new();
440 for v in columns.iter() {
441 if let Some(parsed) = parse_order_by_element(v) {
442 pairs.extend(parsed);
443 }
444 }
445 if pairs.is_empty() {
446 return Err(PlanError::InvalidPlan(
447 "orderBy columns could not be parsed (expect column names, 'col ASC'/'col DESC', or ['a','b'])".into(),
448 ));
449 }
450 let col_names: Vec<String> = pairs
451 .iter()
452 .map(|(s, _)| df.resolve_column_name(s.as_str()))
453 .collect::<Result<Vec<_>, _>>()
454 .map_err(PlanError::Session)?;
455 let ascending: Vec<bool> = pairs.iter().map(|(_, asc)| *asc).collect();
456 let nulls_last = payload
457 .get("nulls_last")
458 .and_then(Value::as_array)
459 .map(|a| a.iter().filter_map(|v| v.as_bool()).collect::<Vec<_>>());
460 if let Some(nl) = nulls_last {
461 let mut sort_orders: Vec<SortOrder> = Vec::with_capacity(col_names.len());
462 for (i, name) in col_names.iter().enumerate() {
463 let asc = ascending.get(i).copied().unwrap_or(true);
464 let nlast = nl.get(i).copied().unwrap_or(asc);
465 let column = col(name.as_str());
466 let so = if asc {
467 if nlast {
468 asc_nulls_last(&column)
469 } else {
470 asc_nulls_first(&column)
471 }
472 } else if nlast {
473 desc_nulls_last(&column)
474 } else {
475 desc_nulls_first(&column)
476 };
477 sort_orders.push(so);
478 }
479 df.order_by_exprs(sort_orders).map_err(PlanError::Session)
480 } else {
481 let refs: Vec<&str> = col_names.iter().map(|s| s.as_str()).collect();
482 df.order_by(refs, ascending).map_err(PlanError::Session)
483 }
484 }
485 "distinct" => df.distinct(None).map_err(PlanError::Session),
486 "drop" => {
487 let columns = payload
488 .get("columns")
489 .and_then(Value::as_array)
490 .ok_or_else(|| {
491 PlanError::InvalidPlan("drop payload must have 'columns' array".into())
492 })?;
493 let names: Vec<String> = columns
494 .iter()
495 .filter_map(|v| {
496 v.as_str().map(String::from).or_else(|| expr_to_col_name(v))
497 })
498 .map(|s| df.resolve_column_name(s.as_str()))
499 .collect::<Result<Vec<_>, _>>()
500 .map_err(PlanError::Session)?;
501 let refs: Vec<&str> = names.iter().map(|s| s.as_str()).collect();
502 df.drop(refs).map_err(PlanError::Session)
503 }
504 "withColumnRenamed" => {
505 let old_name = payload.get("old").and_then(Value::as_str).ok_or_else(|| {
506 PlanError::InvalidPlan("withColumnRenamed must have 'old'".into())
507 })?;
508 let new_name = payload.get("new").and_then(Value::as_str).ok_or_else(|| {
509 PlanError::InvalidPlan("withColumnRenamed must have 'new'".into())
510 })?;
511 let resolved_old = df
512 .resolve_column_name(old_name)
513 .map_err(PlanError::Session)?;
514 df.with_column_renamed(&resolved_old, new_name)
515 .map_err(PlanError::Session)
516 }
517 "withColumn" => {
518 let name = payload
520 .get("name")
521 .or_else(|| payload.get("alias"))
522 .and_then(Value::as_str)
523 .ok_or_else(|| PlanError::InvalidPlan("withColumn must have 'name' or 'alias'".into()))?;
524 let expr_val = payload
525 .get("expr")
526 .ok_or_else(|| PlanError::InvalidPlan("withColumn must have 'expr'".into()))?;
527 if let Some(res) = try_column_from_udf_value(expr_val) {
528 let col = res.map_err(PlanError::Expr)?;
529 df.with_column(name, &col).map_err(PlanError::Session)
530 } else {
531 let expr = expr_from_value(expr_val).map_err(PlanError::Expr)?;
532 df.with_column_expr(name, expr).map_err(PlanError::Session)
533 }
534 }
535 "groupBy" => {
536 let group_by = payload
537 .get("group_by")
538 .and_then(Value::as_array)
539 .ok_or_else(|| {
540 PlanError::InvalidPlan("groupBy must have 'group_by' array".into())
541 })?;
542 let cols: Vec<String> = group_by
544 .iter()
545 .filter_map(|v| {
546 v.as_str()
547 .map(|s| s.to_string())
548 .or_else(|| {
549 v.get("col")
550 .and_then(Value::as_str)
551 .map(|s| s.to_string())
552 .or_else(|| {
553 v.get("name").and_then(Value::as_str).map(|s| s.to_string())
554 })
555 })
556 .or_else(|| {
557 v.get("expr")
559 .and_then(|e| expr_from_value(e).ok())
560 .and_then(|expr| {
561 polars_plan::utils::expr_output_name(&expr)
562 .ok()
563 .map(|s| s.as_str().to_string())
564 })
565 })
566 })
567 .map(|s| df.resolve_column_name(s.as_str()))
568 .collect::<Result<Vec<_>, _>>()
569 .map_err(PlanError::Session)?;
570 let refs: Vec<&str> = cols.iter().map(|s| s.as_str()).collect();
571 let grouped = df.group_by(refs).map_err(PlanError::Session)?;
572 let aggs = payload
574 .get("aggs")
575 .or_else(|| payload.get("aggregations"))
576 .and_then(Value::as_array);
577 match aggs {
578 Some(aggs_arr) => {
579 let agg_exprs = parse_aggs(aggs_arr, &df)?;
580 let disambiguated = disambiguate_agg_output_names(agg_exprs);
581 grouped.agg(disambiguated).map_err(PlanError::Session)
582 }
583 None => Err(PlanError::InvalidPlan(
584 "groupBy payload must include 'aggs' array (e.g. [{\"agg\": \"sum\", \"column\": \"b\"}])".into(),
585 )),
586 }
587 }
588 "join" => handle_join_op(session, df, payload),
589 "union" => handle_union_op(session, df, payload),
590 "unionByName" => handle_union_by_name_op(session, df, payload),
591 "crossJoin" | "cross_join" => handle_cross_join_op(session, df, payload),
592 "rollup" => Err(PlanError::UnsupportedOp(
593 "Plan op 'rollup' is not yet supported. Use groupBy for now. See docs for supported operations.".into(),
594 )),
595 "cube" => Err(PlanError::UnsupportedOp(
596 "Plan op 'cube' is not yet supported. Use groupBy for now. See docs for supported operations.".into(),
597 )),
598 _ => Err(PlanError::UnsupportedOp(format!(
599 "Plan op '{op_name}' is not supported. See docs for supported operations (e.g. select, filter, groupBy, join, orderBy, limit)."
600 ))),
601 }
602}
603
604fn handle_join_op(
605 session: &SparkSession,
606 df: DataFrame,
607 payload: Value,
608) -> Result<DataFrame, PlanError> {
609 let other_data = get_other_data(&payload)
610 .ok_or_else(|| PlanError::InvalidPlan("join must have 'other_data'".into()))?;
611 let other_schema = get_other_schema(&payload)
612 .ok_or_else(|| PlanError::InvalidPlan("join must have 'other_schema'".into()))?;
613 let on = payload
614 .get("on")
615 .ok_or_else(|| PlanError::InvalidPlan("join must have 'on' array or string".into()))?;
616 let how = payload
617 .get("how")
618 .and_then(Value::as_str)
619 .unwrap_or("inner");
620
621 let schema_vec: Vec<(String, String)> = other_schema
622 .iter()
623 .filter_map(schema_field_to_pair)
624 .collect();
625 let schema_names: Vec<String> = schema_vec.iter().map(|(n, _)| n.clone()).collect();
626 let rows = other_data_to_rows(other_data, &schema_names);
627 let mut other_df = session
628 .create_dataframe_from_rows(rows, schema_vec, false, false)
629 .map_err(PlanError::Session)?;
630
631 let on_keys_left = parse_join_on(on, &df)?;
632 let on_keys_right = parse_join_on(on, &other_df)?;
634 for (i, left_name) in on_keys_left.iter().enumerate() {
635 if let Some(right_name) = on_keys_right.get(i) {
636 if left_name != right_name {
637 other_df = other_df
638 .with_column_renamed(right_name, left_name)
639 .map_err(PlanError::Session)?;
640 }
641 }
642 }
643 let left_refs: Vec<&str> = on_keys_left.iter().map(|s| s.as_str()).collect();
647 let right_refs: Vec<&str> = on_keys_left.iter().map(|s| s.as_str()).collect();
648 let join_type = match how {
649 "left" => JoinType::Left,
650 "right" => JoinType::Right,
651 "outer" => JoinType::Outer,
652 "left_semi" | "leftsemi" | "semi" => JoinType::LeftSemi,
653 "left_anti" | "leftanti" | "anti" => JoinType::LeftAnti,
654 _ => JoinType::Inner,
655 };
656 df.join_with_keys(&other_df, left_refs, right_refs, join_type, false)
657 .map_err(PlanError::Session)
658}
659
660fn handle_union_op(
661 session: &SparkSession,
662 df: DataFrame,
663 payload: Value,
664) -> Result<DataFrame, PlanError> {
665 let other_data = get_other_data(&payload)
666 .ok_or_else(|| PlanError::InvalidPlan("union must have 'other_data'".into()))?;
667 let other_schema = get_other_schema(&payload)
668 .ok_or_else(|| PlanError::InvalidPlan("union must have 'other_schema'".into()))?;
669 let schema_vec: Vec<(String, String)> = other_schema
670 .iter()
671 .filter_map(schema_field_to_pair)
672 .collect();
673 let schema_names: Vec<String> = schema_vec.iter().map(|(n, _)| n.clone()).collect();
674 let rows = other_data_to_rows(other_data, &schema_names);
675 let other_df = session
676 .create_dataframe_from_rows(rows, schema_vec, false, false)
677 .map_err(PlanError::Session)?;
678 df.union(&other_df).map_err(PlanError::Session)
679}
680
681fn handle_union_by_name_op(
682 session: &SparkSession,
683 df: DataFrame,
684 payload: Value,
685) -> Result<DataFrame, PlanError> {
686 let other_data = get_other_data(&payload)
687 .ok_or_else(|| PlanError::InvalidPlan("unionByName must have 'other_data'".into()))?;
688 let other_schema = get_other_schema(&payload)
689 .ok_or_else(|| PlanError::InvalidPlan("unionByName must have 'other_schema'".into()))?;
690 let schema_vec: Vec<(String, String)> = other_schema
691 .iter()
692 .filter_map(schema_field_to_pair)
693 .collect();
694 let schema_names: Vec<String> = schema_vec.iter().map(|(n, _)| n.clone()).collect();
695 let rows = other_data_to_rows(other_data, &schema_names);
696 let other_df = session
697 .create_dataframe_from_rows(rows, schema_vec, false, false)
698 .map_err(PlanError::Session)?;
699 df.union_by_name(&other_df, true)
700 .map_err(PlanError::Session)
701}
702
703fn handle_cross_join_op(
704 session: &SparkSession,
705 df: DataFrame,
706 payload: Value,
707) -> Result<DataFrame, PlanError> {
708 let other_data = get_other_data(&payload)
709 .ok_or_else(|| PlanError::InvalidPlan("crossJoin must have 'other_data'".into()))?;
710 let other_schema = get_other_schema(&payload)
711 .ok_or_else(|| PlanError::InvalidPlan("crossJoin must have 'other_schema'".into()))?;
712 let schema_vec: Vec<(String, String)> = other_schema
713 .iter()
714 .filter_map(schema_field_to_pair)
715 .collect();
716 let schema_names: Vec<String> = schema_vec.iter().map(|(n, _)| n.clone()).collect();
717 let rows = other_data_to_rows(other_data, &schema_names);
718 let other_df = session
719 .create_dataframe_from_rows(rows, schema_vec, false, false)
720 .map_err(PlanError::Session)?;
721 df.cross_join(&other_df).map_err(PlanError::Session)
722}
723
724fn parse_aggs(aggs: &[Value], df: &DataFrame) -> Result<Vec<polars::prelude::Expr>, PlanError> {
725 use crate::Column;
726 use crate::functions::{avg, count, first as rs_first, max, min, sum as rs_sum};
727 use polars::prelude::len;
728 use std::collections::HashMap;
729
730 let mut out = Vec::with_capacity(aggs.len());
731 let mut alias_count: HashMap<String, u32> = HashMap::new();
732 for a in aggs {
733 let obj = a
734 .as_object()
735 .ok_or_else(|| PlanError::InvalidPlan("each agg must be an object".into()))?;
736 if let Some(expr_val) = obj.get("expr") {
738 let expr = expr_from_value(expr_val).map_err(PlanError::Expr)?;
739 let resolved = df
740 .resolve_expr_column_names(expr)
741 .map_err(PlanError::Session)?;
742 out.push(resolved);
743 continue;
744 }
745 let agg = obj
747 .get("agg")
748 .or_else(|| obj.get("func"))
749 .and_then(Value::as_str)
750 .ok_or_else(|| PlanError::InvalidPlan("agg must have 'agg' or 'func' string".into()))?;
751
752 if agg == "python_grouped_udf" {
753 return Err(PlanError::InvalidPlan(
756 "python_grouped_udf aggregations are not yet supported in execute_plan; use built-in aggregations in plans for now".into(),
757 ));
758 }
759
760 let col_name = obj.get("column").and_then(Value::as_str);
761 let c = match col_name {
762 Some(name) => {
763 let resolved = df.resolve_column_name(name).map_err(PlanError::Session)?;
764 Column::new(resolved)
765 }
766 None => {
767 if agg == "count" {
768 Column::new("".to_string()) } else {
770 return Err(PlanError::InvalidPlan(format!(
771 "agg '{agg}' requires 'column'"
772 )));
773 }
774 }
775 };
776 let col_expr = match agg {
778 "count" if col_name.map(|s| s.is_empty()).unwrap_or(true) => Column::from_expr(
779 len().cast(polars::prelude::DataType::Int64),
780 Some("count".to_string()),
781 ),
782 "count" => count(&c),
783 "sum" => {
784 let keep_int = col_name
786 .and_then(|n| df.get_column_data_type(n))
787 .map(|dt| {
788 matches!(
789 dt,
790 crate::schema::DataType::Long | crate::schema::DataType::Integer
791 )
792 })
793 .unwrap_or(false);
794 if keep_int {
795 let name = c.name().to_string();
796 Column::from_expr(c.into_expr().sum(), Some(format!("sum({})", name)))
797 } else {
798 rs_sum(&c)
799 }
800 }
801 "avg" => avg(&c),
802 "min" => min(&c),
803 "max" => max(&c),
804 "first" => {
805 let ignorenulls = obj
806 .get("ignorenulls")
807 .and_then(Value::as_bool)
808 .unwrap_or(false);
809 rs_first(&c, ignorenulls)
810 }
811 "last" => Column::from_expr(c.into_expr().last(), None),
812 _ => return Err(PlanError::InvalidPlan(format!("unsupported agg: {agg}"))),
813 };
814 let mut expr = col_expr.into_expr();
815 if let Some(cast_type) = obj
817 .get("cast")
818 .or_else(|| obj.get("cast_type"))
819 .and_then(Value::as_str)
820 {
821 let dtype =
822 crate::functions::parse_type_name(cast_type).map_err(PlanError::InvalidPlan)?;
823 expr = expr.strict_cast(dtype);
824 }
825 let base_alias = obj
829 .get("alias")
830 .or_else(|| obj.get("name"))
831 .and_then(Value::as_str)
832 .map(String::from)
833 .unwrap_or_else(|| match (agg, col_name) {
834 ("count", None) => "count".to_string(),
835 (a, Some(col)) => format!("{}({})", a, col),
836 (a, None) => format!("{}({})", a, ""),
837 });
838 let count = alias_count.entry(base_alias.clone()).or_insert(0);
839 *count += 1;
840 let alias = if *count == 1 {
841 base_alias
842 } else {
843 format!("{}_{}", base_alias, *count - 1)
844 };
845 expr = expr.alias(&alias);
846 out.push(expr);
847 }
848 Ok(out)
849}
850
851#[cfg(test)]
852mod tests {
853 use super::*;
854 use serde_json::json;
855
856 #[test]
858 fn test_groupby_agg_column_name_avg_value() {
859 let session = crate::session::SparkSession::builder()
860 .app_name("plan_agg_alias")
861 .get_or_create();
862 let data = vec![
863 vec![json!("Alice"), json!(5.0)],
864 vec![json!("Alice"), json!(6.0)],
865 vec![json!("Bob"), json!(5.0)],
866 ];
867 let schema = vec![
868 ("Name".to_string(), "string".to_string()),
869 ("Value".to_string(), "double".to_string()),
870 ];
871 let plan = vec![json!({
872 "op": "groupBy",
873 "payload": {
874 "group_by": ["Name"],
875 "aggs": [{"agg": "avg", "column": "Value"}]
876 }
877 })];
878 let df = execute_plan(&session, data, schema, &plan).unwrap();
879 let out = df.collect_inner().unwrap();
880 let names = out.get_column_names();
881 assert!(
882 names.iter().any(|s| s.as_str() == "avg(Value)"),
883 "expected column 'avg(Value)' in {:?}",
884 names
885 );
886 }
887
888 #[test]
889 fn test_cross_join_plan() {
890 let session = crate::session::SparkSession::builder()
891 .app_name("plan_cross_join")
892 .get_or_create();
893 let data = vec![vec![json!(1)], vec![json!(2)]];
894 let schema = vec![("a".to_string(), "bigint".to_string())];
895 let plan = vec![json!({
896 "op": "crossJoin",
897 "payload": {
898 "other_data": [[3], [4]],
899 "other_schema": [{"name": "b", "type": "bigint"}]
900 }
901 })];
902 let df = execute_plan(&session, data, schema, &plan).unwrap();
903 let out = df.collect_inner().unwrap();
904 assert_eq!(out.height(), 4, "cross join 2x2 = 4 rows");
905 assert_eq!(out.get_column_names(), &["a", "b"]);
906 }
907
908 #[test]
910 fn test_join_on_expression_returns_clear_error() {
911 let session = crate::session::SparkSession::builder()
912 .app_name("plan_join_on_expr")
913 .get_or_create();
914 let data = vec![vec![json!(1), json!("a")]];
915 let schema = vec![
916 ("id".to_string(), "bigint".to_string()),
917 ("x".to_string(), "string".to_string()),
918 ];
919 let plan = vec![json!({
920 "op": "join",
921 "payload": {
922 "on": "array_contains(col, x)",
923 "how": "inner",
924 "other_data": [[1, "b"]],
925 "other_schema": [{"name": "id", "type": "bigint"}, {"name": "x", "type": "string"}]
926 }
927 })];
928 let result = execute_plan(&session, data, schema, &plan);
929 let err = match result {
930 Ok(_) => panic!("join on expression should fail"),
931 Err(e) => e,
932 };
933 let msg = err.to_string();
934 assert!(
935 msg.contains("join on expression") || msg.contains("use column names only"),
936 "error should explain join-on expression not supported: {}",
937 msg
938 );
939 }
940
941 #[test]
942 fn test_order_by_col_desc_and_list_format() {
943 let session = crate::session::SparkSession::builder()
944 .app_name("plan_orderby_desc")
945 .get_or_create();
946 let data = vec![
947 vec![json!(1), json!("z")],
948 vec![json!(2), json!("a")],
949 vec![json!(3), json!("m")],
950 ];
951 let schema = vec![
952 ("id".to_string(), "bigint".to_string()),
953 ("name".to_string(), "string".to_string()),
954 ];
955 let plan = vec![json!({
956 "op": "orderBy",
957 "payload": { "columns": ["name DESC"] }
958 })];
959 let df = execute_plan(&session, data.clone(), schema.clone(), &plan).unwrap();
960 assert_eq!(df.count().unwrap(), 3);
961 let rows = df.collect_as_json_rows().unwrap();
962 assert_eq!(rows[0].get("name").and_then(|v| v.as_str()), Some("z"));
963 let plan2 = vec![json!({
964 "op": "orderBy",
965 "payload": { "columns": ["['id','name']"] }
966 })];
967 let df2 = execute_plan(&session, data, schema, &plan2).unwrap();
968 assert_eq!(df2.collect_inner().unwrap().height(), 3);
969 }
970
971 #[test]
973 fn test_plan_select_sin_cos_tan() {
974 let session = crate::session::SparkSession::builder()
975 .app_name("plan_trig")
976 .get_or_create();
977 let pi_2 = std::f64::consts::FRAC_PI_2;
978 let data = vec![vec![json!(0.0)], vec![json!(pi_2)]];
979 let schema = vec![("x".to_string(), "double".to_string())];
980 let plan = vec![json!({
981 "op": "select",
982 "payload": [
983 {"name": "x", "expr": {"col": "x"}},
984 {"name": "s", "expr": {"fn": "sin", "args": [{"col": "x"}]}},
985 {"name": "c", "expr": {"fn": "cos", "args": [{"col": "x"}]}},
986 {"name": "t", "expr": {"fn": "tan", "args": [{"col": "x"}]}}
987 ]
988 })];
989 let df = execute_plan(&session, data, schema, &plan).unwrap();
990 assert_eq!(df.count().unwrap(), 2);
991 let out = df.collect_inner().unwrap();
992 assert_eq!(out.height(), 2);
993 let s_col = out.column("s").unwrap();
995 let c_col = out.column("c").unwrap();
996 assert_eq!(s_col.f64().unwrap().get(0), Some(0.0));
997 assert!((c_col.f64().unwrap().get(0).unwrap() - 1.0).abs() < 1e-10);
998 assert!((s_col.f64().unwrap().get(1).unwrap() - 1.0).abs() < 1e-10);
999 }
1000
1001 #[test]
1003 fn test_plan_select_drop_column_ref_objects() {
1004 let session = crate::session::SparkSession::builder()
1005 .app_name("plan_select_col_ref")
1006 .get_or_create();
1007 let data = vec![
1008 vec![json!(1), json!("x"), json!(10)],
1009 vec![json!(2), json!("y"), json!(20)],
1010 ];
1011 let schema = vec![
1012 ("a".to_string(), "bigint".to_string()),
1013 ("b".to_string(), "string".to_string()),
1014 ("c".to_string(), "bigint".to_string()),
1015 ];
1016 let plan_select = vec![json!({
1018 "op": "select",
1019 "payload": [{"col": "a"}, {"col": "b"}]
1020 })];
1021 let df = execute_plan(&session, data.clone(), schema.clone(), &plan_select).unwrap();
1022 let out = df.collect_inner().unwrap();
1023 assert_eq!(out.get_column_names(), &["a", "b"]);
1024 assert_eq!(out.height(), 2);
1025 let plan_drop = vec![
1027 json!({"op": "select", "payload": [{"col": "a"}, {"col": "b"}, {"col": "c"}]}),
1028 json!({"op": "drop", "payload": {"columns": [{"col": "b"}]}}),
1029 ];
1030 let df2 = execute_plan(&session, data, schema, &plan_drop).unwrap();
1031 let out2 = df2.collect_inner().unwrap();
1032 assert_eq!(out2.get_column_names(), &["a", "c"]);
1033 }
1034
1035 #[test]
1037 fn test_plan_create_dataframe_from_rows_with_nulls() {
1038 let session = crate::session::SparkSession::builder()
1039 .app_name("plan_nulls")
1040 .get_or_create();
1041 let data = vec![
1042 vec![json!(1), json!("a"), serde_json::Value::Null],
1043 vec![json!(2), serde_json::Value::Null, json!(20)],
1044 ];
1045 let schema = vec![
1046 ("id".to_string(), "bigint".to_string()),
1047 ("name".to_string(), "string".to_string()),
1048 ("value".to_string(), "bigint".to_string()),
1049 ];
1050 let plan = vec![
1051 json!({"op": "filter", "payload": {"op": "gt", "left": {"col": "id"}, "right": {"lit": 0}}}),
1052 json!({"op": "select", "payload": ["id", "name", "value"]}),
1053 ];
1054 let df = execute_plan(&session, data, schema, &plan).unwrap();
1055 assert_eq!(df.count().unwrap(), 2);
1056 let rows = df.collect_as_json_rows().unwrap();
1057 assert_eq!(rows[0].get("value"), Some(&serde_json::Value::Null));
1058 assert_eq!(rows[1].get("name"), Some(&serde_json::Value::Null));
1059 }
1060
1061 #[test]
1063 fn test_plan_executor_trait_matches_execute_plan() {
1064 use robin_sparkless_core::engine::PlanExecutor as _;
1065
1066 let session = crate::session::SparkSession::builder()
1067 .app_name("plan_executor_trait")
1068 .get_or_create();
1069 let data = vec![vec![json!(1)], vec![json!(2)]];
1070 let schema = vec![("x".to_string(), "bigint".to_string())];
1071 let plan = vec![json!({
1072 "op": "filter",
1073 "payload": {"op": "gt", "left": {"col": "x"}, "right": {"lit": 1}}
1074 })];
1075
1076 let df_direct = execute_plan(&session, data.clone(), schema.clone(), &plan).unwrap();
1078
1079 let boxed = PolarsPlanExecutor::execute_plan(&session, data, schema, &plan).unwrap();
1081 let backend_df = boxed
1082 .as_any()
1083 .downcast_ref::<crate::dataframe::DataFrame>()
1084 .expect("expected Polars DataFrame backend")
1085 .clone();
1086
1087 assert_eq!(
1088 df_direct.collect_inner().unwrap(),
1089 backend_df.collect_inner().unwrap()
1090 );
1091 }
1092}