Skip to main content

datafusion_expr/
expr_fn.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Functions for creating logical expressions
19
20use crate::expr::{
21    AggregateFunction, BinaryExpr, Cast, Exists, GroupingSet, InList, InSubquery, Lambda,
22    LambdaVariable, NullTreatment, Placeholder, TryCast, Unnest, WildcardOptions,
23    WindowFunction,
24};
25use crate::function::{
26    AccumulatorArgs, AccumulatorFactoryFunction, PartitionEvaluatorFactory,
27    StateFieldsArgs,
28};
29use crate::ptr_eq::PtrEq;
30use crate::select_expr::SelectExpr;
31use crate::{
32    AggregateUDF, Expr, LimitEffect, LogicalPlan, Operator, PartitionEvaluator,
33    ScalarFunctionArgs, ScalarFunctionImplementation, ScalarUDF, Signature, Volatility,
34    conditional_expressions::CaseBuilder, expr::Sort, logical_plan::Subquery,
35};
36use crate::{
37    AggregateUDFImpl, ColumnarValue, ScalarUDFImpl, WindowFrame, WindowUDF, WindowUDFImpl,
38};
39use arrow::compute::kernels::cast_utils::{
40    parse_interval_day_time, parse_interval_month_day_nano, parse_interval_year_month,
41};
42use arrow::datatypes::{DataType, Field, FieldRef};
43use datafusion_common::{Column, Result, ScalarValue, Spans, TableReference, plan_err};
44use datafusion_functions_window_common::field::WindowUDFFieldArgs;
45use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
46use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
47use std::collections::HashMap;
48use std::fmt::Debug;
49use std::hash::Hash;
50use std::ops::Not;
51use std::sync::Arc;
52
53/// Create a column expression based on a qualified or unqualified column name. Will
54/// normalize unquoted identifiers according to SQL rules (identifiers will become lowercase).
55///
56/// For example:
57///
58/// ```rust
59/// # use datafusion_expr::col;
60/// let c1 = col("a");
61/// let c2 = col("A");
62/// assert_eq!(c1, c2);
63///
64/// // note how quoting with double quotes preserves the case
65/// let c3 = col(r#""A""#);
66/// assert_ne!(c1, c3);
67/// ```
68pub fn col(ident: impl Into<Column>) -> Expr {
69    Expr::Column(ident.into())
70}
71
72/// Create an out reference column which hold a reference that has been resolved to a field
73/// outside of the current plan.
74/// The expression created by this function does not preserve the metadata of the outer column.
75/// Please use `out_ref_col_with_metadata` if you want to preserve the metadata.
76pub fn out_ref_col(dt: DataType, ident: impl Into<Column>) -> Expr {
77    out_ref_col_with_metadata(dt, HashMap::new(), ident)
78}
79
80/// Create an out reference column from an existing field (preserving metadata)
81pub fn out_ref_col_with_metadata(
82    dt: DataType,
83    metadata: HashMap<String, String>,
84    ident: impl Into<Column>,
85) -> Expr {
86    let column = ident.into();
87    let field: FieldRef =
88        Arc::new(Field::new(column.name(), dt, true).with_metadata(metadata));
89    Expr::OuterReferenceColumn(field, column)
90}
91
92/// Create an unqualified column expression from the provided name, without normalizing
93/// the column.
94///
95/// For example:
96///
97/// ```rust
98/// # use datafusion_expr::{col, ident};
99/// let c1 = ident("A"); // not normalized staying as column 'A'
100/// let c2 = col("A"); // normalized via SQL rules becoming column 'a'
101/// assert_ne!(c1, c2);
102///
103/// let c3 = col(r#""A""#);
104/// assert_eq!(c1, c3);
105///
106/// let c4 = col("t1.a"); // parses as relation 't1' column 'a'
107/// let c5 = ident("t1.a"); // parses as column 't1.a'
108/// assert_ne!(c4, c5);
109/// ```
110pub fn ident(name: impl Into<String>) -> Expr {
111    Expr::Column(Column::from_name(name))
112}
113
114/// Create placeholder value that will be filled in (such as `$1`)
115///
116/// Note the parameter type can be inferred using [`Expr::infer_placeholder_types`]
117///
118/// # Example
119///
120/// ```rust
121/// # use datafusion_expr::{placeholder};
122/// let p = placeholder("$1"); // $1, refers to parameter 1
123/// assert_eq!(p.to_string(), "$1")
124/// ```
125pub fn placeholder(id: impl Into<String>) -> Expr {
126    Expr::Placeholder(Placeholder {
127        id: id.into(),
128        field: None,
129    })
130}
131
132/// Create an '*' [`Expr::Wildcard`] expression that matches all columns
133///
134/// # Example
135///
136/// ```rust
137/// # use datafusion_expr::{wildcard};
138/// let p = wildcard();
139/// assert_eq!(p.to_string(), "*")
140/// ```
141pub fn wildcard() -> SelectExpr {
142    SelectExpr::Wildcard(WildcardOptions::default())
143}
144
145/// Create an '*' [`Expr::Wildcard`] expression with the wildcard options
146pub fn wildcard_with_options(options: WildcardOptions) -> SelectExpr {
147    SelectExpr::Wildcard(options)
148}
149
150/// Create an 't.*' [`Expr::Wildcard`] expression that matches all columns from a specific table
151///
152/// # Example
153///
154/// ```rust
155/// # use datafusion_common::TableReference;
156/// # use datafusion_expr::{qualified_wildcard};
157/// let p = qualified_wildcard(TableReference::bare("t"));
158/// assert_eq!(p.to_string(), "t.*")
159/// ```
160pub fn qualified_wildcard(qualifier: impl Into<TableReference>) -> SelectExpr {
161    SelectExpr::QualifiedWildcard(qualifier.into(), WildcardOptions::default())
162}
163
164/// Create an 't.*' [`Expr::Wildcard`] expression with the wildcard options
165pub fn qualified_wildcard_with_options(
166    qualifier: impl Into<TableReference>,
167    options: WildcardOptions,
168) -> SelectExpr {
169    SelectExpr::QualifiedWildcard(qualifier.into(), options)
170}
171
172/// Return a new expression `left <op> right`
173pub fn binary_expr(left: Expr, op: Operator, right: Expr) -> Expr {
174    Expr::BinaryExpr(BinaryExpr::new(Box::new(left), op, Box::new(right)))
175}
176
177/// Return a new expression with a logical AND
178pub fn and(left: Expr, right: Expr) -> Expr {
179    Expr::BinaryExpr(BinaryExpr::new(
180        Box::new(left),
181        Operator::And,
182        Box::new(right),
183    ))
184}
185
186/// Return a new expression with a logical OR
187pub fn or(left: Expr, right: Expr) -> Expr {
188    Expr::BinaryExpr(BinaryExpr::new(
189        Box::new(left),
190        Operator::Or,
191        Box::new(right),
192    ))
193}
194
195/// Return a new expression with a logical NOT
196pub fn not(expr: Expr) -> Expr {
197    expr.not()
198}
199
200/// Return a new expression with bitwise AND
201pub fn bitwise_and(left: Expr, right: Expr) -> Expr {
202    Expr::BinaryExpr(BinaryExpr::new(
203        Box::new(left),
204        Operator::BitwiseAnd,
205        Box::new(right),
206    ))
207}
208
209/// Return a new expression with bitwise OR
210pub fn bitwise_or(left: Expr, right: Expr) -> Expr {
211    Expr::BinaryExpr(BinaryExpr::new(
212        Box::new(left),
213        Operator::BitwiseOr,
214        Box::new(right),
215    ))
216}
217
218/// Return a new expression with bitwise XOR
219pub fn bitwise_xor(left: Expr, right: Expr) -> Expr {
220    Expr::BinaryExpr(BinaryExpr::new(
221        Box::new(left),
222        Operator::BitwiseXor,
223        Box::new(right),
224    ))
225}
226
227/// Return a new expression with bitwise SHIFT RIGHT
228pub fn bitwise_shift_right(left: Expr, right: Expr) -> Expr {
229    Expr::BinaryExpr(BinaryExpr::new(
230        Box::new(left),
231        Operator::BitwiseShiftRight,
232        Box::new(right),
233    ))
234}
235
236/// Return a new expression with bitwise SHIFT LEFT
237pub fn bitwise_shift_left(left: Expr, right: Expr) -> Expr {
238    Expr::BinaryExpr(BinaryExpr::new(
239        Box::new(left),
240        Operator::BitwiseShiftLeft,
241        Box::new(right),
242    ))
243}
244
245/// Create an in_list expression
246pub fn in_list(expr: Expr, list: Vec<Expr>, negated: bool) -> Expr {
247    Expr::InList(InList::new(Box::new(expr), list, negated))
248}
249
250/// Create an EXISTS subquery expression
251pub fn exists(subquery: Arc<LogicalPlan>) -> Expr {
252    let outer_ref_columns = subquery.all_out_ref_exprs();
253    Expr::Exists(Exists {
254        subquery: Subquery {
255            subquery,
256            outer_ref_columns,
257            spans: Spans::new(),
258        },
259        negated: false,
260    })
261}
262
263/// Create a NOT EXISTS subquery expression
264pub fn not_exists(subquery: Arc<LogicalPlan>) -> Expr {
265    let outer_ref_columns = subquery.all_out_ref_exprs();
266    Expr::Exists(Exists {
267        subquery: Subquery {
268            subquery,
269            outer_ref_columns,
270            spans: Spans::new(),
271        },
272        negated: true,
273    })
274}
275
276/// Create an IN subquery expression
277pub fn in_subquery(expr: Expr, subquery: Arc<LogicalPlan>) -> Expr {
278    let outer_ref_columns = subquery.all_out_ref_exprs();
279    Expr::InSubquery(InSubquery::new(
280        Box::new(expr),
281        Subquery {
282            subquery,
283            outer_ref_columns,
284            spans: Spans::new(),
285        },
286        false,
287    ))
288}
289
290/// Create a NOT IN subquery expression
291pub fn not_in_subquery(expr: Expr, subquery: Arc<LogicalPlan>) -> Expr {
292    let outer_ref_columns = subquery.all_out_ref_exprs();
293    Expr::InSubquery(InSubquery::new(
294        Box::new(expr),
295        Subquery {
296            subquery,
297            outer_ref_columns,
298            spans: Spans::new(),
299        },
300        true,
301    ))
302}
303
304/// Create a scalar subquery expression
305pub fn scalar_subquery(subquery: Arc<LogicalPlan>) -> Expr {
306    let outer_ref_columns = subquery.all_out_ref_exprs();
307    Expr::ScalarSubquery(Subquery {
308        subquery,
309        outer_ref_columns,
310        spans: Spans::new(),
311    })
312}
313
314/// Create a grouping set
315pub fn grouping_set(exprs: Vec<Vec<Expr>>) -> Expr {
316    Expr::GroupingSet(GroupingSet::GroupingSets(exprs))
317}
318
319/// Create a grouping set for all combination of `exprs`
320pub fn cube(exprs: Vec<Expr>) -> Expr {
321    Expr::GroupingSet(GroupingSet::Cube(exprs))
322}
323
324/// Create a grouping set for rollup
325pub fn rollup(exprs: Vec<Expr>) -> Expr {
326    Expr::GroupingSet(GroupingSet::Rollup(exprs))
327}
328
329/// Create a cast expression
330pub fn cast(expr: Expr, data_type: DataType) -> Expr {
331    Expr::Cast(Cast::new(Box::new(expr), data_type))
332}
333
334/// Create a try cast expression
335pub fn try_cast(expr: Expr, data_type: DataType) -> Expr {
336    Expr::TryCast(TryCast::new(Box::new(expr), data_type))
337}
338
339/// Create is null expression
340pub fn is_null(expr: Expr) -> Expr {
341    Expr::IsNull(Box::new(expr))
342}
343
344/// Create is not null expression
345pub fn is_not_null(expr: Expr) -> Expr {
346    Expr::IsNotNull(Box::new(expr))
347}
348
349/// Create is true expression
350pub fn is_true(expr: Expr) -> Expr {
351    Expr::IsTrue(Box::new(expr))
352}
353
354/// Create is not true expression
355pub fn is_not_true(expr: Expr) -> Expr {
356    Expr::IsNotTrue(Box::new(expr))
357}
358
359/// Create is false expression
360pub fn is_false(expr: Expr) -> Expr {
361    Expr::IsFalse(Box::new(expr))
362}
363
364/// Create is not false expression
365pub fn is_not_false(expr: Expr) -> Expr {
366    Expr::IsNotFalse(Box::new(expr))
367}
368
369/// Create is unknown expression
370pub fn is_unknown(expr: Expr) -> Expr {
371    Expr::IsUnknown(Box::new(expr))
372}
373
374/// Create is not unknown expression
375pub fn is_not_unknown(expr: Expr) -> Expr {
376    Expr::IsNotUnknown(Box::new(expr))
377}
378
379/// Create a CASE WHEN statement with literal WHEN expressions for comparison to the base expression.
380pub fn case(expr: Expr) -> CaseBuilder {
381    CaseBuilder::new(Some(Box::new(expr)), vec![], vec![], None)
382}
383
384/// Create a CASE WHEN statement with boolean WHEN expressions and no base expression.
385pub fn when(when: Expr, then: Expr) -> CaseBuilder {
386    CaseBuilder::new(None, vec![when], vec![then], None)
387}
388
389/// Create a Unnest expression
390pub fn unnest(expr: Expr) -> Expr {
391    Expr::Unnest(Unnest {
392        expr: Box::new(expr),
393    })
394}
395
396/// Convenience method to create a new user defined scalar function (UDF) with a
397/// specific signature and specific return type.
398///
399/// Note this function does not expose all available features of [`ScalarUDF`],
400/// such as
401///
402/// * computing return types based on input types
403/// * multiple [`Signature`]s
404/// * aliases
405///
406/// See [`ScalarUDF`] for details and examples on how to use the full
407/// functionality.
408pub fn create_udf(
409    name: &str,
410    input_types: Vec<DataType>,
411    return_type: DataType,
412    volatility: Volatility,
413    fun: ScalarFunctionImplementation,
414) -> ScalarUDF {
415    ScalarUDF::from(SimpleScalarUDF::new(
416        name,
417        input_types,
418        return_type,
419        volatility,
420        fun,
421    ))
422}
423
424/// Implements [`ScalarUDFImpl`] for functions that have a single signature and
425/// return type.
426#[derive(PartialEq, Eq, Hash)]
427pub struct SimpleScalarUDF {
428    name: String,
429    signature: Signature,
430    return_type: DataType,
431    fun: PtrEq<ScalarFunctionImplementation>,
432}
433
434impl Debug for SimpleScalarUDF {
435    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
436        f.debug_struct("SimpleScalarUDF")
437            .field("name", &self.name)
438            .field("signature", &self.signature)
439            .field("return_type", &self.return_type)
440            .field("fun", &"<FUNC>")
441            .finish()
442    }
443}
444
445impl SimpleScalarUDF {
446    /// Create a new `SimpleScalarUDF` from a name, input types, return type and
447    /// implementation. Implementing [`ScalarUDFImpl`] allows more flexibility
448    pub fn new(
449        name: impl Into<String>,
450        input_types: Vec<DataType>,
451        return_type: DataType,
452        volatility: Volatility,
453        fun: ScalarFunctionImplementation,
454    ) -> Self {
455        Self::new_with_signature(
456            name,
457            Signature::exact(input_types, volatility),
458            return_type,
459            fun,
460        )
461    }
462
463    /// Create a new `SimpleScalarUDF` from a name, signature, return type and
464    /// implementation. Implementing [`ScalarUDFImpl`] allows more flexibility
465    pub fn new_with_signature(
466        name: impl Into<String>,
467        signature: Signature,
468        return_type: DataType,
469        fun: ScalarFunctionImplementation,
470    ) -> Self {
471        Self {
472            name: name.into(),
473            signature,
474            return_type,
475            fun: fun.into(),
476        }
477    }
478}
479
480impl ScalarUDFImpl for SimpleScalarUDF {
481    fn name(&self) -> &str {
482        &self.name
483    }
484
485    fn signature(&self) -> &Signature {
486        &self.signature
487    }
488
489    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
490        Ok(self.return_type.clone())
491    }
492
493    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
494        (self.fun)(&args.args)
495    }
496}
497
498/// Creates a new UDAF with a specific signature, state type and return type.
499/// The signature and state type must match the `Accumulator's implementation`.
500pub fn create_udaf(
501    name: &str,
502    input_type: Vec<DataType>,
503    return_type: Arc<DataType>,
504    volatility: Volatility,
505    accumulator: AccumulatorFactoryFunction,
506    state_type: Arc<Vec<DataType>>,
507) -> AggregateUDF {
508    let return_type = Arc::unwrap_or_clone(return_type);
509    let state_type = Arc::unwrap_or_clone(state_type);
510    let state_fields = state_type
511        .into_iter()
512        .enumerate()
513        .map(|(i, t)| Field::new(format!("{i}"), t, true))
514        .map(Arc::new)
515        .collect::<Vec<_>>();
516    AggregateUDF::from(SimpleAggregateUDF::new(
517        name,
518        input_type,
519        return_type,
520        volatility,
521        accumulator,
522        state_fields,
523    ))
524}
525
526/// Implements [`AggregateUDFImpl`] for functions that have a single signature and
527/// return type.
528#[derive(PartialEq, Eq, Hash)]
529pub struct SimpleAggregateUDF {
530    name: String,
531    signature: Signature,
532    return_type: DataType,
533    accumulator: PtrEq<AccumulatorFactoryFunction>,
534    state_fields: Vec<FieldRef>,
535}
536
537impl Debug for SimpleAggregateUDF {
538    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
539        f.debug_struct("SimpleAggregateUDF")
540            .field("name", &self.name)
541            .field("signature", &self.signature)
542            .field("return_type", &self.return_type)
543            .field("fun", &"<FUNC>")
544            .finish()
545    }
546}
547
548impl SimpleAggregateUDF {
549    /// Create a new `SimpleAggregateUDF` from a name, input types, return type, state type and
550    /// implementation. Implementing [`AggregateUDFImpl`] allows more flexibility
551    pub fn new(
552        name: impl Into<String>,
553        input_type: Vec<DataType>,
554        return_type: DataType,
555        volatility: Volatility,
556        accumulator: AccumulatorFactoryFunction,
557        state_fields: Vec<FieldRef>,
558    ) -> Self {
559        let name = name.into();
560        let signature = Signature::exact(input_type, volatility);
561        Self {
562            name,
563            signature,
564            return_type,
565            accumulator: accumulator.into(),
566            state_fields,
567        }
568    }
569
570    /// Create a new `SimpleAggregateUDF` from a name, signature, return type, state type and
571    /// implementation. Implementing [`AggregateUDFImpl`] allows more flexibility
572    pub fn new_with_signature(
573        name: impl Into<String>,
574        signature: Signature,
575        return_type: DataType,
576        accumulator: AccumulatorFactoryFunction,
577        state_fields: Vec<FieldRef>,
578    ) -> Self {
579        let name = name.into();
580        Self {
581            name,
582            signature,
583            return_type,
584            accumulator: accumulator.into(),
585            state_fields,
586        }
587    }
588}
589
590impl AggregateUDFImpl for SimpleAggregateUDF {
591    fn name(&self) -> &str {
592        &self.name
593    }
594
595    fn signature(&self) -> &Signature {
596        &self.signature
597    }
598
599    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
600        Ok(self.return_type.clone())
601    }
602
603    fn accumulator(
604        &self,
605        acc_args: AccumulatorArgs,
606    ) -> Result<Box<dyn crate::Accumulator>> {
607        (self.accumulator)(acc_args)
608    }
609
610    fn state_fields(&self, _args: StateFieldsArgs) -> Result<Vec<FieldRef>> {
611        Ok(self.state_fields.clone())
612    }
613}
614
615/// Creates a new UDWF with a specific signature, state type and return type.
616///
617/// The signature and state type must match the [`PartitionEvaluator`]'s implementation`.
618///
619/// [`PartitionEvaluator`]: crate::PartitionEvaluator
620pub fn create_udwf(
621    name: &str,
622    input_type: DataType,
623    return_type: Arc<DataType>,
624    volatility: Volatility,
625    partition_evaluator_factory: PartitionEvaluatorFactory,
626) -> WindowUDF {
627    let return_type = Arc::unwrap_or_clone(return_type);
628    WindowUDF::from(SimpleWindowUDF::new(
629        name,
630        input_type,
631        return_type,
632        volatility,
633        partition_evaluator_factory,
634    ))
635}
636
637/// Implements [`WindowUDFImpl`] for functions that have a single signature and
638/// return type.
639#[derive(PartialEq, Eq, Hash)]
640pub struct SimpleWindowUDF {
641    name: String,
642    signature: Signature,
643    return_type: DataType,
644    partition_evaluator_factory: PtrEq<PartitionEvaluatorFactory>,
645}
646
647impl Debug for SimpleWindowUDF {
648    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
649        f.debug_struct("WindowUDF")
650            .field("name", &self.name)
651            .field("signature", &self.signature)
652            .field("return_type", &"<func>")
653            .field("partition_evaluator_factory", &"<FUNC>")
654            .finish()
655    }
656}
657
658impl SimpleWindowUDF {
659    /// Create a new `SimpleWindowUDF` from a name, input types, return type and
660    /// implementation. Implementing [`WindowUDFImpl`] allows more flexibility
661    pub fn new(
662        name: impl Into<String>,
663        input_type: DataType,
664        return_type: DataType,
665        volatility: Volatility,
666        partition_evaluator_factory: PartitionEvaluatorFactory,
667    ) -> Self {
668        let name = name.into();
669        let signature = Signature::exact([input_type].to_vec(), volatility);
670        Self {
671            name,
672            signature,
673            return_type,
674            partition_evaluator_factory: partition_evaluator_factory.into(),
675        }
676    }
677}
678
679impl WindowUDFImpl for SimpleWindowUDF {
680    fn name(&self) -> &str {
681        &self.name
682    }
683
684    fn signature(&self) -> &Signature {
685        &self.signature
686    }
687
688    fn partition_evaluator(
689        &self,
690        _partition_evaluator_args: PartitionEvaluatorArgs,
691    ) -> Result<Box<dyn PartitionEvaluator>> {
692        (self.partition_evaluator_factory)()
693    }
694
695    fn field(&self, field_args: WindowUDFFieldArgs) -> Result<FieldRef> {
696        Ok(Arc::new(Field::new(
697            field_args.name(),
698            self.return_type.clone(),
699            true,
700        )))
701    }
702
703    fn limit_effect(&self, _args: &[Arc<dyn PhysicalExpr>]) -> LimitEffect {
704        LimitEffect::Unknown
705    }
706}
707
708pub fn interval_year_month_lit(value: &str) -> Expr {
709    let interval = parse_interval_year_month(value).ok();
710    Expr::Literal(ScalarValue::IntervalYearMonth(interval), None)
711}
712
713pub fn interval_datetime_lit(value: &str) -> Expr {
714    let interval = parse_interval_day_time(value).ok();
715    Expr::Literal(ScalarValue::IntervalDayTime(interval), None)
716}
717
718pub fn interval_month_day_nano_lit(value: &str) -> Expr {
719    let interval = parse_interval_month_day_nano(value).ok();
720    Expr::Literal(ScalarValue::IntervalMonthDayNano(interval), None)
721}
722
723/// Create a lambda expression
724pub fn lambda(params: impl IntoIterator<Item = impl Into<String>>, body: Expr) -> Expr {
725    Expr::Lambda(Lambda::new(
726        params.into_iter().map(Into::into).collect(),
727        body,
728    ))
729}
730
731/// Create an unresolved lambda variable expression
732///
733/// The expression tree or [`LogicalPlan`] which
734/// owns this variable must be resolved before usage with either
735/// [`Expr::resolve_lambda_variables`] or [`LogicalPlan::resolve_lambda_variables`].
736///
737/// [LogicalPlan::resolve_lambda_variables]: crate::LogicalPlan::resolve_lambda_variables
738pub fn lambda_var(name: impl Into<String>) -> Expr {
739    Expr::LambdaVariable(LambdaVariable::new(name.into(), None))
740}
741
742/// Extensions for configuring [`Expr::AggregateFunction`] or [`Expr::WindowFunction`]
743///
744/// Adds methods to [`Expr`] that make it easy to set optional options
745/// such as `ORDER BY`, `FILTER` and `DISTINCT`
746///
747/// # Example
748/// ```no_run
749/// # use datafusion_common::Result;
750/// # use datafusion_expr::expr::NullTreatment;
751/// # use datafusion_expr::test::function_stub::count;
752/// # use datafusion_expr::{ExprFunctionExt, lit, Expr, col};
753/// # // first_value is an aggregate function in another crate
754/// # fn first_value(_arg: Expr) -> Expr {
755/// unimplemented!() }
756/// # fn main() -> Result<()> {
757/// // Create an aggregate count, filtering on column y > 5
758/// let agg = count(col("x")).filter(col("y").gt(lit(5))).build()?;
759///
760/// // Find the first value in an aggregate sorted by column y
761/// // equivalent to:
762/// // `FIRST_VALUE(x ORDER BY y ASC IGNORE NULLS)`
763/// let sort_expr = col("y").sort(true, true);
764/// let agg = first_value(col("x"))
765///     .order_by(vec![sort_expr])
766///     .null_treatment(NullTreatment::IgnoreNulls)
767///     .build()?;
768///
769/// // Create a window expression for percent rank partitioned on column a
770/// // equivalent to:
771/// // `PERCENT_RANK() OVER (PARTITION BY a ORDER BY b ASC NULLS LAST IGNORE NULLS)`
772/// // percent_rank is an udwf function in another crate
773/// # fn percent_rank() -> Expr {
774/// unimplemented!() }
775/// let window = percent_rank()
776///     .partition_by(vec![col("a")])
777///     .order_by(vec![col("b").sort(true, true)])
778///     .null_treatment(NullTreatment::IgnoreNulls)
779///     .build()?;
780/// #     Ok(())
781/// # }
782/// ```
783pub trait ExprFunctionExt {
784    /// Add `ORDER BY <order_by>`
785    fn order_by(self, order_by: Vec<Sort>) -> ExprFuncBuilder;
786    /// Add `FILTER <filter>`
787    fn filter(self, filter: Expr) -> ExprFuncBuilder;
788    /// Add `DISTINCT`
789    fn distinct(self) -> ExprFuncBuilder;
790    /// Add `RESPECT NULLS` or `IGNORE NULLS`
791    fn null_treatment(
792        self,
793        null_treatment: impl Into<Option<NullTreatment>>,
794    ) -> ExprFuncBuilder;
795    /// Add `PARTITION BY`
796    fn partition_by(self, partition_by: Vec<Expr>) -> ExprFuncBuilder;
797    /// Add appropriate window frame conditions
798    fn window_frame(self, window_frame: WindowFrame) -> ExprFuncBuilder;
799}
800
801#[derive(Debug, Clone)]
802pub enum ExprFuncKind {
803    Aggregate(AggregateFunction),
804    Window(Box<WindowFunction>),
805}
806
807/// Implementation of [`ExprFunctionExt`].
808///
809/// See [`ExprFunctionExt`] for usage and examples
810#[derive(Debug, Clone)]
811pub struct ExprFuncBuilder {
812    fun: Option<ExprFuncKind>,
813    order_by: Option<Vec<Sort>>,
814    filter: Option<Expr>,
815    distinct: bool,
816    null_treatment: Option<NullTreatment>,
817    partition_by: Option<Vec<Expr>>,
818    window_frame: Option<WindowFrame>,
819}
820
821impl ExprFuncBuilder {
822    /// Create a new `ExprFuncBuilder`, see [`ExprFunctionExt`]
823    fn new(fun: Option<ExprFuncKind>) -> Self {
824        Self {
825            fun,
826            order_by: None,
827            filter: None,
828            distinct: false,
829            null_treatment: None,
830            partition_by: None,
831            window_frame: None,
832        }
833    }
834
835    /// Updates and returns the in progress [`Expr::AggregateFunction`] or [`Expr::WindowFunction`]
836    ///
837    /// # Errors:
838    ///
839    /// Returns an error if this builder  [`ExprFunctionExt`] was used with an
840    /// `Expr` variant other than [`Expr::AggregateFunction`] or [`Expr::WindowFunction`]
841    pub fn build(self) -> Result<Expr> {
842        let Self {
843            fun,
844            order_by,
845            filter,
846            distinct,
847            null_treatment,
848            partition_by,
849            window_frame,
850        } = self;
851
852        let Some(fun) = fun else {
853            return plan_err!(
854                "ExprFunctionExt can only be used with Expr::AggregateFunction or Expr::WindowFunction"
855            );
856        };
857
858        let fun_expr = match fun {
859            ExprFuncKind::Aggregate(mut udaf) => {
860                udaf.params.order_by = order_by.unwrap_or_default();
861                udaf.params.filter = filter.map(Box::new);
862                udaf.params.distinct = distinct;
863                udaf.params.null_treatment = null_treatment;
864                Expr::AggregateFunction(udaf)
865            }
866            ExprFuncKind::Window(mut udwf) => {
867                let has_order_by = order_by.as_ref().map(|o| !o.is_empty());
868                udwf.params.partition_by = partition_by.unwrap_or_default();
869                udwf.params.order_by = order_by.unwrap_or_default();
870                udwf.params.window_frame =
871                    window_frame.unwrap_or_else(|| WindowFrame::new(has_order_by));
872                udwf.params.filter = filter.map(Box::new);
873                udwf.params.null_treatment = null_treatment;
874                udwf.params.distinct = distinct;
875                Expr::WindowFunction(udwf)
876            }
877        };
878
879        Ok(fun_expr)
880    }
881}
882
883impl ExprFunctionExt for ExprFuncBuilder {
884    /// Add `ORDER BY <order_by>`
885    fn order_by(mut self, order_by: Vec<Sort>) -> ExprFuncBuilder {
886        self.order_by = Some(order_by);
887        self
888    }
889
890    /// Add `FILTER <filter>`
891    fn filter(mut self, filter: Expr) -> ExprFuncBuilder {
892        self.filter = Some(filter);
893        self
894    }
895
896    /// Add `DISTINCT`
897    fn distinct(mut self) -> ExprFuncBuilder {
898        self.distinct = true;
899        self
900    }
901
902    /// Add `RESPECT NULLS` or `IGNORE NULLS`
903    fn null_treatment(
904        mut self,
905        null_treatment: impl Into<Option<NullTreatment>>,
906    ) -> ExprFuncBuilder {
907        self.null_treatment = null_treatment.into();
908        self
909    }
910
911    fn partition_by(mut self, partition_by: Vec<Expr>) -> ExprFuncBuilder {
912        self.partition_by = Some(partition_by);
913        self
914    }
915
916    fn window_frame(mut self, window_frame: WindowFrame) -> ExprFuncBuilder {
917        self.window_frame = Some(window_frame);
918        self
919    }
920}
921
922impl ExprFunctionExt for Expr {
923    fn order_by(self, order_by: Vec<Sort>) -> ExprFuncBuilder {
924        let mut builder = match self {
925            Expr::AggregateFunction(udaf) => {
926                ExprFuncBuilder::new(Some(ExprFuncKind::Aggregate(udaf)))
927            }
928            Expr::WindowFunction(udwf) => {
929                ExprFuncBuilder::new(Some(ExprFuncKind::Window(udwf)))
930            }
931            _ => ExprFuncBuilder::new(None),
932        };
933        if builder.fun.is_some() {
934            builder.order_by = Some(order_by);
935        }
936        builder
937    }
938    fn filter(self, filter: Expr) -> ExprFuncBuilder {
939        match self {
940            Expr::AggregateFunction(udaf) => {
941                let mut builder =
942                    ExprFuncBuilder::new(Some(ExprFuncKind::Aggregate(udaf)));
943                builder.filter = Some(filter);
944                builder
945            }
946            _ => ExprFuncBuilder::new(None),
947        }
948    }
949    fn distinct(self) -> ExprFuncBuilder {
950        match self {
951            Expr::AggregateFunction(udaf) => {
952                let mut builder =
953                    ExprFuncBuilder::new(Some(ExprFuncKind::Aggregate(udaf)));
954                builder.distinct = true;
955                builder
956            }
957            _ => ExprFuncBuilder::new(None),
958        }
959    }
960    fn null_treatment(
961        self,
962        null_treatment: impl Into<Option<NullTreatment>>,
963    ) -> ExprFuncBuilder {
964        let mut builder = match self {
965            Expr::AggregateFunction(udaf) => {
966                ExprFuncBuilder::new(Some(ExprFuncKind::Aggregate(udaf)))
967            }
968            Expr::WindowFunction(udwf) => {
969                ExprFuncBuilder::new(Some(ExprFuncKind::Window(udwf)))
970            }
971            _ => ExprFuncBuilder::new(None),
972        };
973        if builder.fun.is_some() {
974            builder.null_treatment = null_treatment.into();
975        }
976        builder
977    }
978
979    fn partition_by(self, partition_by: Vec<Expr>) -> ExprFuncBuilder {
980        match self {
981            Expr::WindowFunction(udwf) => {
982                let mut builder = ExprFuncBuilder::new(Some(ExprFuncKind::Window(udwf)));
983                builder.partition_by = Some(partition_by);
984                builder
985            }
986            _ => ExprFuncBuilder::new(None),
987        }
988    }
989
990    fn window_frame(self, window_frame: WindowFrame) -> ExprFuncBuilder {
991        match self {
992            Expr::WindowFunction(udwf) => {
993                let mut builder = ExprFuncBuilder::new(Some(ExprFuncKind::Window(udwf)));
994                builder.window_frame = Some(window_frame);
995                builder
996            }
997            _ => ExprFuncBuilder::new(None),
998        }
999    }
1000}
1001
1002#[cfg(test)]
1003mod test {
1004    use super::*;
1005
1006    #[test]
1007    fn filter_is_null_and_is_not_null() {
1008        let col_null = col("col1");
1009        let col_not_null = ident("col2");
1010        assert_eq!(format!("{}", col_null.is_null()), "col1 IS NULL");
1011        assert_eq!(
1012            format!("{}", col_not_null.is_not_null()),
1013            "col2 IS NOT NULL"
1014        );
1015    }
1016}