robin_sparkless_polars/plan/
mod.rs1mod expr;
6
7use crate::dataframe::{DataFrame, JoinType};
8use crate::functions::{
9 SortOrder, asc_nulls_first, asc_nulls_last, col, desc_nulls_first, desc_nulls_last,
10};
11use crate::plan::expr::{expr_from_value, try_column_from_udf_value};
12use crate::session::{SparkSession, set_thread_udf_session};
13pub use expr::PlanExprError;
14use polars::prelude::PolarsError;
15use serde_json::Value;
16
17pub fn execute_plan(
25 session: &SparkSession,
26 data: Vec<Vec<Value>>,
27 schema: Vec<(String, String)>,
28 plan: &[Value],
29) -> Result<DataFrame, PlanError> {
30 set_thread_udf_session(session.clone());
31 let mut df = session
32 .create_dataframe_from_rows(data, schema)
33 .map_err(PlanError::Session)?
34 .with_case_insensitive_column_resolution();
35
36 for op_value in plan {
37 let op_obj = op_value
38 .as_object()
39 .ok_or_else(|| PlanError::InvalidPlan("each plan step must be a JSON object".into()))?;
40 let op_name = op_obj
41 .get("op")
42 .and_then(Value::as_str)
43 .ok_or_else(|| PlanError::InvalidPlan("each plan step must have 'op' string".into()))?;
44 let mut payload = op_obj.get("payload").cloned().unwrap_or(Value::Null);
45 if matches!(op_name, "join" | "union" | "unionByName") {
48 payload = merge_other_into_payload(payload, op_obj);
49 }
50
51 df = apply_op(session, df, op_name, payload)?;
52 }
53
54 Ok(df)
55}
56
57#[derive(Debug)]
59pub enum PlanError {
60 Session(PolarsError),
61 Expr(PlanExprError),
62 InvalidPlan(String),
63 UnsupportedOp(String),
64}
65
66impl std::fmt::Display for PlanError {
67 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68 match self {
69 PlanError::Session(e) => write!(f, "session/df: {e}"),
70 PlanError::Expr(e) => write!(f, "expression: {e}"),
71 PlanError::InvalidPlan(s) => write!(f, "invalid plan: {s}"),
72 PlanError::UnsupportedOp(s) => write!(f, "unsupported op: {s}"),
73 }
74 }
75}
76
77impl std::error::Error for PlanError {}
78
79fn merge_other_into_payload(payload: Value, op: &serde_json::Map<String, Value>) -> Value {
81 fn get(obj: &serde_json::Map<String, Value>, snake: &str, camel: &str) -> Option<Value> {
82 obj.get(snake).or_else(|| obj.get(camel)).cloned()
83 }
84 let mut p = match payload {
85 Value::Object(m) => m,
86 _ => return payload,
87 };
88 if p.get("other_data").or_else(|| p.get("otherData")).is_none() {
89 if let Some(v) = get(op, "other_data", "otherData") {
90 p.insert("other_data".into(), v);
91 }
92 }
93 if p.get("other_schema")
94 .or_else(|| p.get("otherSchema"))
95 .is_none()
96 {
97 if let Some(v) = get(op, "other_schema", "otherSchema") {
98 p.insert("other_schema".into(), v);
99 }
100 }
101 if p.get("on").is_none() {
102 if let Some(v) = get(op, "on", "on") {
103 p.insert("on".into(), v);
104 }
105 }
106 Value::Object(p)
107}
108
109fn get_other_data(payload: &Value) -> Option<&Vec<Value>> {
111 payload
112 .get("other_data")
113 .or_else(|| payload.get("otherData"))
114 .and_then(Value::as_array)
115}
116
117fn get_other_schema(payload: &Value) -> Option<&Vec<Value>> {
119 payload
120 .get("other_schema")
121 .or_else(|| payload.get("otherSchema"))
122 .and_then(Value::as_array)
123}
124
125fn expr_to_col_name(v: &Value) -> Option<String> {
127 let obj = v.as_object()?;
128 obj.get("col")
129 .or_else(|| obj.get("column"))
130 .and_then(Value::as_str)
131 .map(|s| s.to_string())
132}
133
134fn parse_join_on(on: &Value, df: &DataFrame) -> Result<Vec<String>, PlanError> {
138 if let Some(s) = on.as_str() {
139 let resolved = df.resolve_column_name(s).map_err(PlanError::Session)?;
140 return Ok(vec![resolved]);
141 }
142 let arr = on.as_array().ok_or_else(|| {
143 PlanError::InvalidPlan(
144 "join 'on' must be string, array of strings, or array of column refs / eq expressions"
145 .into(),
146 )
147 })?;
148 let mut keys = Vec::with_capacity(arr.len());
149 for v in arr {
150 if let Some(s) = v.as_str() {
151 let resolved = df.resolve_column_name(s).map_err(PlanError::Session)?;
152 keys.push(resolved);
153 continue;
154 }
155 if let Some(obj) = v.as_object() {
156 if let Some(name) = expr_to_col_name(v) {
158 let resolved = df.resolve_column_name(&name).map_err(PlanError::Session)?;
159 keys.push(resolved);
160 continue;
161 }
162 let op = obj
164 .get("op")
165 .or_else(|| obj.get("operator"))
166 .and_then(Value::as_str);
167 if op.map(|o| o == "eq" || o == "==").unwrap_or(false) {
168 let left = obj.get("left").and_then(expr_to_col_name);
169 let right = obj.get("right").and_then(expr_to_col_name);
170 if let (Some(l), Some(r)) = (left, right) {
171 if l == r {
172 let resolved = df.resolve_column_name(&l).map_err(PlanError::Session)?;
173 keys.push(resolved);
174 continue;
175 }
176 }
177 }
178 }
179 return Err(PlanError::InvalidPlan(
180 "join 'on' element must be string, {\"col\": \"name\"}, or {\"op\": \"eq\", \"left\": {\"col\": \"x\"}, \"right\": {\"col\": \"x\"}}".into(),
181 ));
182 }
183 Ok(keys)
184}
185
186fn other_data_to_rows(other_data: &[Value], schema_names: &[String]) -> Vec<Vec<Value>> {
188 other_data
189 .iter()
190 .filter_map(|v| {
191 if let Some(arr) = v.as_array() {
192 return Some(arr.clone());
193 }
194 if let Some(obj) = v.as_object() {
195 let row: Vec<Value> = schema_names
196 .iter()
197 .map(|n| obj.get(n).cloned().unwrap_or(Value::Null))
198 .collect();
199 return Some(row);
200 }
201 None
202 })
203 .collect()
204}
205
206fn schema_field_to_pair(v: &Value) -> Option<(String, String)> {
208 let obj = v.as_object()?;
209 let name = obj
210 .get("name")
211 .or_else(|| obj.get("fieldName"))
212 .and_then(Value::as_str)?
213 .to_string();
214 let ty = obj
215 .get("type")
216 .or_else(|| obj.get("dataType"))
217 .and_then(Value::as_str)
218 .or_else(|| {
219 obj.get("dataType")?.get("typeName").and_then(Value::as_str)
221 })?
222 .to_string();
223 Some((name, ty))
224}
225
226fn apply_op(
227 session: &SparkSession,
228 df: DataFrame,
229 op_name: &str,
230 payload: Value,
231) -> Result<DataFrame, PlanError> {
232 match op_name {
233 "stop" => {
234 let _ = payload;
235 session.stop();
236 Ok(df)
237 }
238 "filter" => {
239 let expr = expr_from_value(&payload).map_err(PlanError::Expr)?;
240 df.filter(expr).map_err(PlanError::Session)
241 }
242 "select" => {
243 let arr = payload
245 .as_array()
246 .or_else(|| payload.get("columns").and_then(Value::as_array));
247 if let Some(arr) = arr {
248 if arr.is_empty() {
249 return Err(PlanError::InvalidPlan(
250 "select payload must be non-empty array".into(),
251 ));
252 }
253 let first = &arr[0];
254 let is_expr_list = first.is_object() && first.get("expr").is_some();
255 if is_expr_list {
256 let mut exprs = Vec::with_capacity(arr.len());
258 for v in arr {
259 let obj = v.as_object().ok_or_else(|| {
260 PlanError::InvalidPlan(
261 "select payload with expressions must be array of {name, expr} objects".into(),
262 )
263 })?;
264 let name = obj.get("name").and_then(Value::as_str).ok_or_else(|| {
265 PlanError::InvalidPlan("select item must have 'name' string".into())
266 })?;
267 let expr_val = obj.get("expr").ok_or_else(|| {
268 PlanError::InvalidPlan("select item must have 'expr'".into())
269 })?;
270 let expr = expr_from_value(expr_val).map_err(PlanError::Expr)?;
271 let resolved = df
272 .resolve_expr_column_names(expr)
273 .map_err(PlanError::Session)?;
274 exprs.push(resolved.alias(name));
275 }
276 df.select_exprs(exprs).map_err(PlanError::Session)
277 } else {
278 let strings: Vec<String> = arr
280 .iter()
281 .map(|v| {
282 if let Some(s) = v.as_str() {
283 Ok(s.to_string())
284 } else if let Some(obj) = v.as_object() {
285 obj.get("name")
286 .and_then(Value::as_str)
287 .map(|s| s.to_string())
288 .ok_or_else(|| {
289 PlanError::InvalidPlan(
290 "select column item must have 'name' string".into(),
291 )
292 })
293 } else {
294 Err(PlanError::InvalidPlan(
295 "select payload must be list of column name strings or {name, expr} or {type, name} objects".into(),
296 ))
297 }
298 })
299 .collect::<Result<Vec<_>, _>>()?;
300 let has_concat = strings.iter().any(|s| {
301 crate::plan::expr::try_parse_concat_expr_from_string(s.as_str()).is_some()
302 });
303 if !has_concat {
304 let names: Vec<String> = strings
305 .iter()
306 .map(|s| df.resolve_column_name(s.as_str()))
307 .collect::<Result<Vec<_>, _>>()
308 .map_err(PlanError::Session)?;
309 let refs: Vec<&str> = names.iter().map(|s| s.as_str()).collect();
310 return df.select(refs).map_err(PlanError::Session);
311 }
312 let mut exprs = Vec::with_capacity(strings.len());
313 for s in &strings {
314 if let Some(expr) =
315 crate::plan::expr::try_parse_concat_expr_from_string(s.as_str())
316 {
317 let resolved = df
318 .resolve_expr_column_names(expr)
319 .map_err(PlanError::Session)?;
320 exprs.push(resolved.alias(s));
321 } else {
322 let resolved = df
323 .resolve_column_name(s.as_str())
324 .map_err(PlanError::Session)?;
325 exprs.push(polars::prelude::col(resolved));
326 }
327 }
328 df.select_exprs(exprs).map_err(PlanError::Session)
329 }
330 } else {
331 Err(PlanError::InvalidPlan(
332 "select payload must be array of column names or {name, expr} objects, or object with 'columns' array".into(),
333 ))
334 }
335 }
336 "limit" => {
337 let n = payload.get("n").and_then(Value::as_u64).ok_or_else(|| {
338 PlanError::InvalidPlan("limit payload must have 'n' number".into())
339 })?;
340 df.limit(n as usize).map_err(PlanError::Session)
341 }
342 "offset" => {
343 let n = payload.get("n").and_then(Value::as_u64).unwrap_or(0);
344 df.offset(n as usize).map_err(PlanError::Session)
345 }
346 "orderBy" => {
347 let columns = payload
348 .get("columns")
349 .and_then(Value::as_array)
350 .ok_or_else(|| {
351 PlanError::InvalidPlan("orderBy payload must have 'columns' array".into())
352 })?;
353 let col_names: Vec<String> = columns
354 .iter()
355 .filter_map(|v| v.as_str())
356 .map(|s| df.resolve_column_name(s))
357 .collect::<Result<Vec<_>, _>>()
358 .map_err(PlanError::Session)?;
359 let ascending = payload
360 .get("ascending")
361 .and_then(Value::as_array)
362 .map(|a| a.iter().filter_map(|v| v.as_bool()).collect::<Vec<_>>())
363 .unwrap_or_else(|| vec![true; col_names.len()]);
364 let nulls_last = payload
365 .get("nulls_last")
366 .and_then(Value::as_array)
367 .map(|a| a.iter().filter_map(|v| v.as_bool()).collect::<Vec<_>>());
368 if let Some(nl) = nulls_last {
369 let mut sort_orders: Vec<SortOrder> = Vec::with_capacity(col_names.len());
370 for (i, name) in col_names.iter().enumerate() {
371 let asc = ascending.get(i).copied().unwrap_or(true);
372 let nlast = nl.get(i).copied().unwrap_or(asc);
373 let column = col(name.as_str());
374 let so = if asc {
375 if nlast {
376 asc_nulls_last(&column)
377 } else {
378 asc_nulls_first(&column)
379 }
380 } else if nlast {
381 desc_nulls_last(&column)
382 } else {
383 desc_nulls_first(&column)
384 };
385 sort_orders.push(so);
386 }
387 df.order_by_exprs(sort_orders).map_err(PlanError::Session)
388 } else {
389 let refs: Vec<&str> = col_names.iter().map(|s| s.as_str()).collect();
390 df.order_by(refs, ascending).map_err(PlanError::Session)
391 }
392 }
393 "distinct" => df.distinct(None).map_err(PlanError::Session),
394 "drop" => {
395 let columns = payload
396 .get("columns")
397 .and_then(Value::as_array)
398 .ok_or_else(|| {
399 PlanError::InvalidPlan("drop payload must have 'columns' array".into())
400 })?;
401 let names: Vec<String> = columns
402 .iter()
403 .filter_map(|v| v.as_str())
404 .map(|s| df.resolve_column_name(s))
405 .collect::<Result<Vec<_>, _>>()
406 .map_err(PlanError::Session)?;
407 let refs: Vec<&str> = names.iter().map(|s| s.as_str()).collect();
408 df.drop(refs).map_err(PlanError::Session)
409 }
410 "withColumnRenamed" => {
411 let old_name = payload.get("old").and_then(Value::as_str).ok_or_else(|| {
412 PlanError::InvalidPlan("withColumnRenamed must have 'old'".into())
413 })?;
414 let new_name = payload.get("new").and_then(Value::as_str).ok_or_else(|| {
415 PlanError::InvalidPlan("withColumnRenamed must have 'new'".into())
416 })?;
417 let resolved_old = df
418 .resolve_column_name(old_name)
419 .map_err(PlanError::Session)?;
420 df.with_column_renamed(&resolved_old, new_name)
421 .map_err(PlanError::Session)
422 }
423 "withColumn" => {
424 let name = payload
425 .get("name")
426 .and_then(Value::as_str)
427 .ok_or_else(|| PlanError::InvalidPlan("withColumn must have 'name'".into()))?;
428 let expr_val = payload
429 .get("expr")
430 .ok_or_else(|| PlanError::InvalidPlan("withColumn must have 'expr'".into()))?;
431 if let Some(res) = try_column_from_udf_value(expr_val) {
432 let col = res.map_err(PlanError::Expr)?;
433 df.with_column(name, &col).map_err(PlanError::Session)
434 } else {
435 let expr = expr_from_value(expr_val).map_err(PlanError::Expr)?;
436 df.with_column_expr(name, expr).map_err(PlanError::Session)
437 }
438 }
439 "groupBy" => {
440 let group_by = payload
441 .get("group_by")
442 .and_then(Value::as_array)
443 .ok_or_else(|| {
444 PlanError::InvalidPlan("groupBy must have 'group_by' array".into())
445 })?;
446 let cols: Vec<String> = group_by
447 .iter()
448 .filter_map(|v| v.as_str())
449 .map(|s| df.resolve_column_name(s))
450 .collect::<Result<Vec<_>, _>>()
451 .map_err(PlanError::Session)?;
452 let refs: Vec<&str> = cols.iter().map(|s| s.as_str()).collect();
453 let grouped = df.group_by(refs).map_err(PlanError::Session)?;
454 let aggs = payload.get("aggs").and_then(Value::as_array);
455 match aggs {
456 Some(aggs_arr) => {
457 let agg_exprs = parse_aggs(aggs_arr, &df)?;
458 grouped.agg(agg_exprs).map_err(PlanError::Session)
459 }
460 None => Err(PlanError::InvalidPlan(
461 "groupBy payload must include 'aggs' array (e.g. [{\"agg\": \"sum\", \"column\": \"b\"}])".into(),
462 )),
463 }
464 }
465 "join" => {
466 let other_data = get_other_data(&payload)
467 .ok_or_else(|| PlanError::InvalidPlan("join must have 'other_data'".into()))?;
468 let other_schema = get_other_schema(&payload)
469 .ok_or_else(|| PlanError::InvalidPlan("join must have 'other_schema'".into()))?;
470 let on = payload.get("on").ok_or_else(|| {
471 PlanError::InvalidPlan("join must have 'on' array or string".into())
472 })?;
473 let how = payload
474 .get("how")
475 .and_then(Value::as_str)
476 .unwrap_or("inner");
477
478 let schema_vec: Vec<(String, String)> = other_schema
479 .iter()
480 .filter_map(schema_field_to_pair)
481 .collect();
482 let schema_names: Vec<String> = schema_vec.iter().map(|(n, _)| n.clone()).collect();
483 let rows = other_data_to_rows(other_data, &schema_names);
484 let other_df = session
485 .create_dataframe_from_rows(rows, schema_vec)
486 .map_err(PlanError::Session)?;
487
488 let on_keys_left = parse_join_on(on, &df)?;
489 let mut other_df = other_df;
491 let on_keys_right = parse_join_on(on, &other_df)?;
492 for (i, left_name) in on_keys_left.iter().enumerate() {
493 if let Some(right_name) = on_keys_right.get(i) {
494 if left_name != right_name {
495 other_df = other_df
496 .with_column_renamed(right_name, left_name)
497 .map_err(PlanError::Session)?;
498 }
499 }
500 }
501 let on_refs: Vec<&str> = on_keys_left.iter().map(|s| s.as_str()).collect();
502 let join_type = match how {
503 "left" => JoinType::Left,
504 "right" => JoinType::Right,
505 "outer" => JoinType::Outer,
506 "left_semi" | "semi" => JoinType::LeftSemi,
507 "left_anti" | "anti" => JoinType::LeftAnti,
508 _ => JoinType::Inner,
509 };
510 df.join(&other_df, on_refs, join_type)
511 .map_err(PlanError::Session)
512 }
513 "union" => {
514 let other_data = get_other_data(&payload)
515 .ok_or_else(|| PlanError::InvalidPlan("union must have 'other_data'".into()))?;
516 let other_schema = get_other_schema(&payload)
517 .ok_or_else(|| PlanError::InvalidPlan("union must have 'other_schema'".into()))?;
518 let schema_vec: Vec<(String, String)> = other_schema
519 .iter()
520 .filter_map(schema_field_to_pair)
521 .collect();
522 let schema_names: Vec<String> = schema_vec.iter().map(|(n, _)| n.clone()).collect();
523 let rows = other_data_to_rows(other_data, &schema_names);
524 let other_df = session
525 .create_dataframe_from_rows(rows, schema_vec)
526 .map_err(PlanError::Session)?;
527 df.union(&other_df).map_err(PlanError::Session)
528 }
529 "unionByName" => {
530 let other_data = get_other_data(&payload).ok_or_else(|| {
531 PlanError::InvalidPlan("unionByName must have 'other_data'".into())
532 })?;
533 let other_schema = get_other_schema(&payload).ok_or_else(|| {
534 PlanError::InvalidPlan("unionByName must have 'other_schema'".into())
535 })?;
536 let schema_vec: Vec<(String, String)> = other_schema
537 .iter()
538 .filter_map(schema_field_to_pair)
539 .collect();
540 let schema_names: Vec<String> = schema_vec.iter().map(|(n, _)| n.clone()).collect();
541 let rows = other_data_to_rows(other_data, &schema_names);
542 let other_df = session
543 .create_dataframe_from_rows(rows, schema_vec)
544 .map_err(PlanError::Session)?;
545 df.union_by_name(&other_df, true)
546 .map_err(PlanError::Session)
547 }
548 _ => Err(PlanError::UnsupportedOp(op_name.to_string())),
549 }
550}
551
552fn parse_aggs(aggs: &[Value], df: &DataFrame) -> Result<Vec<polars::prelude::Expr>, PlanError> {
553 use crate::Column;
554 use crate::functions::{avg, count, max, min, sum as rs_sum};
555
556 let mut out = Vec::with_capacity(aggs.len());
557 for a in aggs {
558 let obj = a
559 .as_object()
560 .ok_or_else(|| PlanError::InvalidPlan("each agg must be an object".into()))?;
561 let agg = obj
562 .get("agg")
563 .and_then(Value::as_str)
564 .ok_or_else(|| PlanError::InvalidPlan("agg must have 'agg' string".into()))?;
565
566 if agg == "python_grouped_udf" {
567 return Err(PlanError::InvalidPlan(
570 "python_grouped_udf aggregations are not yet supported in execute_plan; use built-in aggregations in plans for now".into(),
571 ));
572 }
573
574 let col_name = obj.get("column").and_then(Value::as_str);
575 let c = match col_name {
576 Some(name) => {
577 let resolved = df.resolve_column_name(name).map_err(PlanError::Session)?;
578 Column::new(resolved)
579 }
580 None => {
581 if agg == "count" {
582 Column::new("".to_string()) } else {
584 return Err(PlanError::InvalidPlan(format!(
585 "agg '{agg}' requires 'column'"
586 )));
587 }
588 }
589 };
590 let col_expr = match agg {
591 "count" => count(&c),
592 "sum" => rs_sum(&c),
593 "avg" => avg(&c),
594 "min" => min(&c),
595 "max" => max(&c),
596 "first" => Column::from_expr(c.into_expr().first(), None),
597 "last" => Column::from_expr(c.into_expr().last(), None),
598 _ => return Err(PlanError::InvalidPlan(format!("unsupported agg: {agg}"))),
599 };
600 let mut expr = col_expr.into_expr();
601 if let Some(alias) = obj.get("alias").and_then(Value::as_str) {
602 expr = expr.alias(alias);
603 }
604 out.push(expr);
605 }
606 Ok(out)
607}