Skip to main content

robin_sparkless_polars/plan/
mod.rs

1//! Plan interpreter: execute a serialized logical plan (list of ops) using the existing DataFrame API.
2//!
3//! See [LOGICAL_PLAN_FORMAT.md](../../../docs/LOGICAL_PLAN_FORMAT.md) for the plan and expression schema.
4
5mod expr;
6
7use crate::dataframe::{DataFrame, JoinType};
8use crate::functions::{
9    SortOrder, asc_nulls_first, asc_nulls_last, col, desc_nulls_first, desc_nulls_last,
10};
11use crate::plan::expr::{expr_from_value, try_column_from_udf_value};
12use crate::session::{SparkSession, set_thread_udf_session};
13pub use expr::PlanExprError;
14use polars::prelude::PolarsError;
15use serde_json::Value;
16
17/// Execute a logical plan: build initial DataFrame from (data, schema), then apply each op in sequence.
18///
19/// - `data`: rows as `Vec<Vec<Value>>` (each inner vec is one row; order matches schema).
20/// - `schema`: list of (column_name, dtype_string) e.g. `[("id", "bigint"), ("name", "string")]`.
21/// - `plan`: list of `{"op": "...", "payload": ...}` objects.
22///
23/// Returns the final DataFrame after applying all operations.
24pub fn execute_plan(
25    session: &SparkSession,
26    data: Vec<Vec<Value>>,
27    schema: Vec<(String, String)>,
28    plan: &[Value],
29) -> Result<DataFrame, PlanError> {
30    set_thread_udf_session(session.clone());
31    let mut df = session
32        .create_dataframe_from_rows(data, schema)
33        .map_err(PlanError::Session)?
34        .with_case_insensitive_column_resolution();
35
36    for op_value in plan {
37        let op_obj = op_value
38            .as_object()
39            .ok_or_else(|| PlanError::InvalidPlan("each plan step must be a JSON object".into()))?;
40        let op_name = op_obj
41            .get("op")
42            .and_then(Value::as_str)
43            .ok_or_else(|| PlanError::InvalidPlan("each plan step must have 'op' string".into()))?;
44        let mut payload = op_obj.get("payload").cloned().unwrap_or(Value::Null);
45        // Sparkless may put other_data/other_schema at op level (sibling to payload) or use camelCase.
46        // Merge into payload so apply_op finds them (issue #510).
47        if matches!(op_name, "join" | "union" | "unionByName") {
48            payload = merge_other_into_payload(payload, op_obj);
49        }
50
51        df = apply_op(session, df, op_name, payload)?;
52    }
53
54    Ok(df)
55}
56
57/// Errors from plan execution.
58#[derive(Debug)]
59pub enum PlanError {
60    Session(PolarsError),
61    Expr(PlanExprError),
62    InvalidPlan(String),
63    UnsupportedOp(String),
64}
65
66impl std::fmt::Display for PlanError {
67    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
68        match self {
69            PlanError::Session(e) => write!(f, "session/df: {e}"),
70            PlanError::Expr(e) => write!(f, "expression: {e}"),
71            PlanError::InvalidPlan(s) => write!(f, "invalid plan: {s}"),
72            PlanError::UnsupportedOp(s) => write!(f, "unsupported op: {s}"),
73        }
74    }
75}
76
77impl std::error::Error for PlanError {}
78
79/// Merge other_data/other_schema from op into payload if missing. Supports snake_case and camelCase.
80fn merge_other_into_payload(payload: Value, op: &serde_json::Map<String, Value>) -> Value {
81    fn get(obj: &serde_json::Map<String, Value>, snake: &str, camel: &str) -> Option<Value> {
82        obj.get(snake).or_else(|| obj.get(camel)).cloned()
83    }
84    let mut p = match payload {
85        Value::Object(m) => m,
86        _ => return payload,
87    };
88    if p.get("other_data").or_else(|| p.get("otherData")).is_none() {
89        if let Some(v) = get(op, "other_data", "otherData") {
90            p.insert("other_data".into(), v);
91        }
92    }
93    if p.get("other_schema")
94        .or_else(|| p.get("otherSchema"))
95        .is_none()
96    {
97        if let Some(v) = get(op, "other_schema", "otherSchema") {
98            p.insert("other_schema".into(), v);
99        }
100    }
101    if p.get("on").is_none() {
102        if let Some(v) = get(op, "on", "on") {
103            p.insert("on".into(), v);
104        }
105    }
106    Value::Object(p)
107}
108
109/// Get other_data from payload (snake_case or camelCase).
110fn get_other_data(payload: &Value) -> Option<&Vec<Value>> {
111    payload
112        .get("other_data")
113        .or_else(|| payload.get("otherData"))
114        .and_then(Value::as_array)
115}
116
117/// Get other_schema from payload (snake_case or camelCase).
118fn get_other_schema(payload: &Value) -> Option<&Vec<Value>> {
119    payload
120        .get("other_schema")
121        .or_else(|| payload.get("otherSchema"))
122        .and_then(Value::as_array)
123}
124
125/// Extract column name from expression if it is a simple column reference {"col": "name"}.
126fn expr_to_col_name(v: &Value) -> Option<String> {
127    let obj = v.as_object()?;
128    obj.get("col")
129        .or_else(|| obj.get("column"))
130        .and_then(Value::as_str)
131        .map(|s| s.to_string())
132}
133
134/// Parse join "on" into list of column names. Accepts:
135/// - string -> [s]; array of strings -> those; array of {"col": "x"} -> ["x"];
136/// - array of {"op": "eq", "left": {"col": "a"}, "right": {"col": "a"}} -> ["a"] (Sparkless v4 format, #552).
137fn parse_join_on(on: &Value, df: &DataFrame) -> Result<Vec<String>, PlanError> {
138    if let Some(s) = on.as_str() {
139        let resolved = df.resolve_column_name(s).map_err(PlanError::Session)?;
140        return Ok(vec![resolved]);
141    }
142    let arr = on.as_array().ok_or_else(|| {
143        PlanError::InvalidPlan(
144            "join 'on' must be string, array of strings, or array of column refs / eq expressions"
145                .into(),
146        )
147    })?;
148    let mut keys = Vec::with_capacity(arr.len());
149    for v in arr {
150        if let Some(s) = v.as_str() {
151            let resolved = df.resolve_column_name(s).map_err(PlanError::Session)?;
152            keys.push(resolved);
153            continue;
154        }
155        if let Some(obj) = v.as_object() {
156            // {"col": "x"} -> single key for both sides
157            if let Some(name) = expr_to_col_name(v) {
158                let resolved = df.resolve_column_name(&name).map_err(PlanError::Session)?;
159                keys.push(resolved);
160                continue;
161            }
162            // {"op": "eq"|"==", "left": {"col": "a"}, "right": {"col": "a"}} (Sparkless v4)
163            let op = obj
164                .get("op")
165                .or_else(|| obj.get("operator"))
166                .and_then(Value::as_str);
167            if op.map(|o| o == "eq" || o == "==").unwrap_or(false) {
168                let left = obj.get("left").and_then(expr_to_col_name);
169                let right = obj.get("right").and_then(expr_to_col_name);
170                if let (Some(l), Some(r)) = (left, right) {
171                    if l == r {
172                        let resolved = df.resolve_column_name(&l).map_err(PlanError::Session)?;
173                        keys.push(resolved);
174                        continue;
175                    }
176                }
177            }
178        }
179        return Err(PlanError::InvalidPlan(
180            "join 'on' element must be string, {\"col\": \"name\"}, or {\"op\": \"eq\", \"left\": {\"col\": \"x\"}, \"right\": {\"col\": \"x\"}}".into(),
181        ));
182    }
183    Ok(keys)
184}
185
186/// Convert other_data to rows. Accepts arrays [[v,v],[v,v]] or dicts [{"col":v},...] (Sparkless may send dict rows).
187fn other_data_to_rows(other_data: &[Value], schema_names: &[String]) -> Vec<Vec<Value>> {
188    other_data
189        .iter()
190        .filter_map(|v| {
191            if let Some(arr) = v.as_array() {
192                return Some(arr.clone());
193            }
194            if let Some(obj) = v.as_object() {
195                let row: Vec<Value> = schema_names
196                    .iter()
197                    .map(|n| obj.get(n).cloned().unwrap_or(Value::Null))
198                    .collect();
199                return Some(row);
200            }
201            None
202        })
203        .collect()
204}
205
206/// Parse (name, type) from schema field object. Supports {"name","type"} and {"fieldName","dataType"} (Sparkless).
207fn schema_field_to_pair(v: &Value) -> Option<(String, String)> {
208    let obj = v.as_object()?;
209    let name = obj
210        .get("name")
211        .or_else(|| obj.get("fieldName"))
212        .and_then(Value::as_str)?
213        .to_string();
214    let ty = obj
215        .get("type")
216        .or_else(|| obj.get("dataType"))
217        .and_then(Value::as_str)
218        .or_else(|| {
219            // dataType may be nested: {"type":"string"} or {"typeName":"string"}
220            obj.get("dataType")?.get("typeName").and_then(Value::as_str)
221        })?
222        .to_string();
223    Some((name, ty))
224}
225
226fn apply_op(
227    session: &SparkSession,
228    df: DataFrame,
229    op_name: &str,
230    payload: Value,
231) -> Result<DataFrame, PlanError> {
232    match op_name {
233        "stop" => {
234            let _ = payload;
235            session.stop();
236            Ok(df)
237        }
238        "filter" => {
239            let expr = expr_from_value(&payload).map_err(PlanError::Expr)?;
240            df.filter(expr).map_err(PlanError::Session)
241        }
242        "select" => {
243            // Accept payload as array or as object with "columns" (Sparkless: {"columns": [...]})
244            let arr = payload
245                .as_array()
246                .or_else(|| payload.get("columns").and_then(Value::as_array));
247            if let Some(arr) = arr {
248                if arr.is_empty() {
249                    return Err(PlanError::InvalidPlan(
250                        "select payload must be non-empty array".into(),
251                    ));
252                }
253                let first = &arr[0];
254                let is_expr_list = first.is_object() && first.get("expr").is_some();
255                if is_expr_list {
256                    // Select with computed columns: [{"name": "<alias>", "expr": <expr>}, ...]
257                    let mut exprs = Vec::with_capacity(arr.len());
258                    for v in arr {
259                        let obj = v.as_object().ok_or_else(|| {
260                            PlanError::InvalidPlan(
261                                "select payload with expressions must be array of {name, expr} objects".into(),
262                            )
263                        })?;
264                        let name = obj.get("name").and_then(Value::as_str).ok_or_else(|| {
265                            PlanError::InvalidPlan("select item must have 'name' string".into())
266                        })?;
267                        let expr_val = obj.get("expr").ok_or_else(|| {
268                            PlanError::InvalidPlan("select item must have 'expr'".into())
269                        })?;
270                        let expr = expr_from_value(expr_val).map_err(PlanError::Expr)?;
271                        let resolved = df
272                            .resolve_expr_column_names(expr)
273                            .map_err(PlanError::Session)?;
274                        exprs.push(resolved.alias(name));
275                    }
276                    df.select_exprs(exprs).map_err(PlanError::Session)
277                } else {
278                    // Column names: strings or {type, name} / {name} (Sparkless column refs)
279                    let strings: Vec<String> = arr
280                        .iter()
281                        .map(|v| {
282                            if let Some(s) = v.as_str() {
283                                Ok(s.to_string())
284                            } else if let Some(obj) = v.as_object() {
285                                obj.get("name")
286                                    .and_then(Value::as_str)
287                                    .map(|s| s.to_string())
288                                    .ok_or_else(|| {
289                                        PlanError::InvalidPlan(
290                                            "select column item must have 'name' string".into(),
291                                        )
292                                    })
293                            } else {
294                                Err(PlanError::InvalidPlan(
295                                    "select payload must be list of column name strings or {name, expr} or {type, name} objects".into(),
296                                ))
297                            }
298                        })
299                        .collect::<Result<Vec<_>, _>>()?;
300                    let has_concat = strings.iter().any(|s| {
301                        crate::plan::expr::try_parse_concat_expr_from_string(s.as_str()).is_some()
302                    });
303                    if !has_concat {
304                        let names: Vec<String> = strings
305                            .iter()
306                            .map(|s| df.resolve_column_name(s.as_str()))
307                            .collect::<Result<Vec<_>, _>>()
308                            .map_err(PlanError::Session)?;
309                        let refs: Vec<&str> = names.iter().map(|s| s.as_str()).collect();
310                        return df.select(refs).map_err(PlanError::Session);
311                    }
312                    let mut exprs = Vec::with_capacity(strings.len());
313                    for s in &strings {
314                        if let Some(expr) =
315                            crate::plan::expr::try_parse_concat_expr_from_string(s.as_str())
316                        {
317                            let resolved = df
318                                .resolve_expr_column_names(expr)
319                                .map_err(PlanError::Session)?;
320                            exprs.push(resolved.alias(s));
321                        } else {
322                            let resolved = df
323                                .resolve_column_name(s.as_str())
324                                .map_err(PlanError::Session)?;
325                            exprs.push(polars::prelude::col(resolved));
326                        }
327                    }
328                    df.select_exprs(exprs).map_err(PlanError::Session)
329                }
330            } else {
331                Err(PlanError::InvalidPlan(
332                    "select payload must be array of column names or {name, expr} objects, or object with 'columns' array".into(),
333                ))
334            }
335        }
336        "limit" => {
337            let n = payload.get("n").and_then(Value::as_u64).ok_or_else(|| {
338                PlanError::InvalidPlan("limit payload must have 'n' number".into())
339            })?;
340            df.limit(n as usize).map_err(PlanError::Session)
341        }
342        "offset" => {
343            let n = payload.get("n").and_then(Value::as_u64).unwrap_or(0);
344            df.offset(n as usize).map_err(PlanError::Session)
345        }
346        "orderBy" => {
347            let columns = payload
348                .get("columns")
349                .and_then(Value::as_array)
350                .ok_or_else(|| {
351                    PlanError::InvalidPlan("orderBy payload must have 'columns' array".into())
352                })?;
353            let col_names: Vec<String> = columns
354                .iter()
355                .filter_map(|v| v.as_str())
356                .map(|s| df.resolve_column_name(s))
357                .collect::<Result<Vec<_>, _>>()
358                .map_err(PlanError::Session)?;
359            let ascending = payload
360                .get("ascending")
361                .and_then(Value::as_array)
362                .map(|a| a.iter().filter_map(|v| v.as_bool()).collect::<Vec<_>>())
363                .unwrap_or_else(|| vec![true; col_names.len()]);
364            let nulls_last = payload
365                .get("nulls_last")
366                .and_then(Value::as_array)
367                .map(|a| a.iter().filter_map(|v| v.as_bool()).collect::<Vec<_>>());
368            if let Some(nl) = nulls_last {
369                let mut sort_orders: Vec<SortOrder> = Vec::with_capacity(col_names.len());
370                for (i, name) in col_names.iter().enumerate() {
371                    let asc = ascending.get(i).copied().unwrap_or(true);
372                    let nlast = nl.get(i).copied().unwrap_or(asc);
373                    let column = col(name.as_str());
374                    let so = if asc {
375                        if nlast {
376                            asc_nulls_last(&column)
377                        } else {
378                            asc_nulls_first(&column)
379                        }
380                    } else if nlast {
381                        desc_nulls_last(&column)
382                    } else {
383                        desc_nulls_first(&column)
384                    };
385                    sort_orders.push(so);
386                }
387                df.order_by_exprs(sort_orders).map_err(PlanError::Session)
388            } else {
389                let refs: Vec<&str> = col_names.iter().map(|s| s.as_str()).collect();
390                df.order_by(refs, ascending).map_err(PlanError::Session)
391            }
392        }
393        "distinct" => df.distinct(None).map_err(PlanError::Session),
394        "drop" => {
395            let columns = payload
396                .get("columns")
397                .and_then(Value::as_array)
398                .ok_or_else(|| {
399                    PlanError::InvalidPlan("drop payload must have 'columns' array".into())
400                })?;
401            let names: Vec<String> = columns
402                .iter()
403                .filter_map(|v| v.as_str())
404                .map(|s| df.resolve_column_name(s))
405                .collect::<Result<Vec<_>, _>>()
406                .map_err(PlanError::Session)?;
407            let refs: Vec<&str> = names.iter().map(|s| s.as_str()).collect();
408            df.drop(refs).map_err(PlanError::Session)
409        }
410        "withColumnRenamed" => {
411            let old_name = payload.get("old").and_then(Value::as_str).ok_or_else(|| {
412                PlanError::InvalidPlan("withColumnRenamed must have 'old'".into())
413            })?;
414            let new_name = payload.get("new").and_then(Value::as_str).ok_or_else(|| {
415                PlanError::InvalidPlan("withColumnRenamed must have 'new'".into())
416            })?;
417            let resolved_old = df
418                .resolve_column_name(old_name)
419                .map_err(PlanError::Session)?;
420            df.with_column_renamed(&resolved_old, new_name)
421                .map_err(PlanError::Session)
422        }
423        "withColumn" => {
424            let name = payload
425                .get("name")
426                .and_then(Value::as_str)
427                .ok_or_else(|| PlanError::InvalidPlan("withColumn must have 'name'".into()))?;
428            let expr_val = payload
429                .get("expr")
430                .ok_or_else(|| PlanError::InvalidPlan("withColumn must have 'expr'".into()))?;
431            if let Some(res) = try_column_from_udf_value(expr_val) {
432                let col = res.map_err(PlanError::Expr)?;
433                df.with_column(name, &col).map_err(PlanError::Session)
434            } else {
435                let expr = expr_from_value(expr_val).map_err(PlanError::Expr)?;
436                df.with_column_expr(name, expr).map_err(PlanError::Session)
437            }
438        }
439        "groupBy" => {
440            let group_by = payload
441                .get("group_by")
442                .and_then(Value::as_array)
443                .ok_or_else(|| {
444                    PlanError::InvalidPlan("groupBy must have 'group_by' array".into())
445                })?;
446            let cols: Vec<String> = group_by
447                .iter()
448                .filter_map(|v| v.as_str())
449                .map(|s| df.resolve_column_name(s))
450                .collect::<Result<Vec<_>, _>>()
451                .map_err(PlanError::Session)?;
452            let refs: Vec<&str> = cols.iter().map(|s| s.as_str()).collect();
453            let grouped = df.group_by(refs).map_err(PlanError::Session)?;
454            let aggs = payload.get("aggs").and_then(Value::as_array);
455            match aggs {
456                Some(aggs_arr) => {
457                    let agg_exprs = parse_aggs(aggs_arr, &df)?;
458                    grouped.agg(agg_exprs).map_err(PlanError::Session)
459                }
460                None => Err(PlanError::InvalidPlan(
461                    "groupBy payload must include 'aggs' array (e.g. [{\"agg\": \"sum\", \"column\": \"b\"}])".into(),
462                )),
463            }
464        }
465        "join" => {
466            let other_data = get_other_data(&payload)
467                .ok_or_else(|| PlanError::InvalidPlan("join must have 'other_data'".into()))?;
468            let other_schema = get_other_schema(&payload)
469                .ok_or_else(|| PlanError::InvalidPlan("join must have 'other_schema'".into()))?;
470            let on = payload.get("on").ok_or_else(|| {
471                PlanError::InvalidPlan("join must have 'on' array or string".into())
472            })?;
473            let how = payload
474                .get("how")
475                .and_then(Value::as_str)
476                .unwrap_or("inner");
477
478            let schema_vec: Vec<(String, String)> = other_schema
479                .iter()
480                .filter_map(schema_field_to_pair)
481                .collect();
482            let schema_names: Vec<String> = schema_vec.iter().map(|(n, _)| n.clone()).collect();
483            let rows = other_data_to_rows(other_data, &schema_names);
484            let other_df = session
485                .create_dataframe_from_rows(rows, schema_vec)
486                .map_err(PlanError::Session)?;
487
488            let on_keys_left = parse_join_on(on, &df)?;
489            // Align right join key column names to left's (e.g. left "Dept_Id" vs right "dept_id" -> rename right to "Dept_Id") (#552).
490            let mut other_df = other_df;
491            let on_keys_right = parse_join_on(on, &other_df)?;
492            for (i, left_name) in on_keys_left.iter().enumerate() {
493                if let Some(right_name) = on_keys_right.get(i) {
494                    if left_name != right_name {
495                        other_df = other_df
496                            .with_column_renamed(right_name, left_name)
497                            .map_err(PlanError::Session)?;
498                    }
499                }
500            }
501            let on_refs: Vec<&str> = on_keys_left.iter().map(|s| s.as_str()).collect();
502            let join_type = match how {
503                "left" => JoinType::Left,
504                "right" => JoinType::Right,
505                "outer" => JoinType::Outer,
506                "left_semi" | "semi" => JoinType::LeftSemi,
507                "left_anti" | "anti" => JoinType::LeftAnti,
508                _ => JoinType::Inner,
509            };
510            df.join(&other_df, on_refs, join_type)
511                .map_err(PlanError::Session)
512        }
513        "union" => {
514            let other_data = get_other_data(&payload)
515                .ok_or_else(|| PlanError::InvalidPlan("union must have 'other_data'".into()))?;
516            let other_schema = get_other_schema(&payload)
517                .ok_or_else(|| PlanError::InvalidPlan("union must have 'other_schema'".into()))?;
518            let schema_vec: Vec<(String, String)> = other_schema
519                .iter()
520                .filter_map(schema_field_to_pair)
521                .collect();
522            let schema_names: Vec<String> = schema_vec.iter().map(|(n, _)| n.clone()).collect();
523            let rows = other_data_to_rows(other_data, &schema_names);
524            let other_df = session
525                .create_dataframe_from_rows(rows, schema_vec)
526                .map_err(PlanError::Session)?;
527            df.union(&other_df).map_err(PlanError::Session)
528        }
529        "unionByName" => {
530            let other_data = get_other_data(&payload).ok_or_else(|| {
531                PlanError::InvalidPlan("unionByName must have 'other_data'".into())
532            })?;
533            let other_schema = get_other_schema(&payload).ok_or_else(|| {
534                PlanError::InvalidPlan("unionByName must have 'other_schema'".into())
535            })?;
536            let schema_vec: Vec<(String, String)> = other_schema
537                .iter()
538                .filter_map(schema_field_to_pair)
539                .collect();
540            let schema_names: Vec<String> = schema_vec.iter().map(|(n, _)| n.clone()).collect();
541            let rows = other_data_to_rows(other_data, &schema_names);
542            let other_df = session
543                .create_dataframe_from_rows(rows, schema_vec)
544                .map_err(PlanError::Session)?;
545            df.union_by_name(&other_df, true)
546                .map_err(PlanError::Session)
547        }
548        _ => Err(PlanError::UnsupportedOp(op_name.to_string())),
549    }
550}
551
552fn parse_aggs(aggs: &[Value], df: &DataFrame) -> Result<Vec<polars::prelude::Expr>, PlanError> {
553    use crate::Column;
554    use crate::functions::{avg, count, max, min, sum as rs_sum};
555
556    let mut out = Vec::with_capacity(aggs.len());
557    for a in aggs {
558        let obj = a
559            .as_object()
560            .ok_or_else(|| PlanError::InvalidPlan("each agg must be an object".into()))?;
561        let agg = obj
562            .get("agg")
563            .and_then(Value::as_str)
564            .ok_or_else(|| PlanError::InvalidPlan("agg must have 'agg' string".into()))?;
565
566        if agg == "python_grouped_udf" {
567            // Grouped Python UDF aggregations are not expressible as pure Expr; the plan
568            // interpreter currently supports only Rust/built-in aggregations at this level.
569            return Err(PlanError::InvalidPlan(
570                "python_grouped_udf aggregations are not yet supported in execute_plan; use built-in aggregations in plans for now".into(),
571            ));
572        }
573
574        let col_name = obj.get("column").and_then(Value::as_str);
575        let c = match col_name {
576            Some(name) => {
577                let resolved = df.resolve_column_name(name).map_err(PlanError::Session)?;
578                Column::new(resolved)
579            }
580            None => {
581                if agg == "count" {
582                    Column::new("".to_string()) // count() without column
583                } else {
584                    return Err(PlanError::InvalidPlan(format!(
585                        "agg '{agg}' requires 'column'"
586                    )));
587                }
588            }
589        };
590        let col_expr = match agg {
591            "count" => count(&c),
592            "sum" => rs_sum(&c),
593            "avg" => avg(&c),
594            "min" => min(&c),
595            "max" => max(&c),
596            "first" => Column::from_expr(c.into_expr().first(), None),
597            "last" => Column::from_expr(c.into_expr().last(), None),
598            _ => return Err(PlanError::InvalidPlan(format!("unsupported agg: {agg}"))),
599        };
600        let mut expr = col_expr.into_expr();
601        if let Some(alias) = obj.get("alias").and_then(Value::as_str) {
602            expr = expr.alias(alias);
603        }
604        out.push(expr);
605    }
606    Ok(out)
607}