datafusion_expr/
expr_fn.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Functions for creating logical expressions
19
20use crate::expr::{
21    AggregateFunction, BinaryExpr, Cast, Exists, GroupingSet, InList, InSubquery,
22    Placeholder, TryCast, Unnest, WildcardOptions, WindowFunction, WindowFunctionParams,
23};
24use crate::function::{
25    AccumulatorArgs, AccumulatorFactoryFunction, PartitionEvaluatorFactory,
26    StateFieldsArgs,
27};
28use crate::select_expr::SelectExpr;
29use crate::{
30    conditional_expressions::CaseBuilder, expr::Sort, logical_plan::Subquery,
31    AggregateUDF, Expr, LogicalPlan, Operator, PartitionEvaluator, ScalarFunctionArgs,
32    ScalarFunctionImplementation, ScalarUDF, Signature, Volatility,
33};
34use crate::{
35    AggregateUDFImpl, ColumnarValue, ScalarUDFImpl, WindowFrame, WindowUDF, WindowUDFImpl,
36};
37use arrow::compute::kernels::cast_utils::{
38    parse_interval_day_time, parse_interval_month_day_nano, parse_interval_year_month,
39};
40use arrow::datatypes::{DataType, Field, FieldRef};
41use datafusion_common::{plan_err, Column, Result, ScalarValue, Spans, TableReference};
42use datafusion_functions_window_common::field::WindowUDFFieldArgs;
43use datafusion_functions_window_common::partition::PartitionEvaluatorArgs;
44use sqlparser::ast::NullTreatment;
45use std::any::Any;
46use std::fmt::Debug;
47use std::hash::{DefaultHasher, Hash, Hasher};
48use std::ops::Not;
49use std::sync::Arc;
50
51/// Create a column expression based on a qualified or unqualified column name. Will
52/// normalize unquoted identifiers according to SQL rules (identifiers will become lowercase).
53///
54/// For example:
55///
56/// ```rust
57/// # use datafusion_expr::col;
58/// let c1 = col("a");
59/// let c2 = col("A");
60/// assert_eq!(c1, c2);
61///
62/// // note how quoting with double quotes preserves the case
63/// let c3 = col(r#""A""#);
64/// assert_ne!(c1, c3);
65/// ```
66pub fn col(ident: impl Into<Column>) -> Expr {
67    Expr::Column(ident.into())
68}
69
70/// Create an out reference column which hold a reference that has been resolved to a field
71/// outside of the current plan.
72pub fn out_ref_col(dt: DataType, ident: impl Into<Column>) -> Expr {
73    Expr::OuterReferenceColumn(dt, ident.into())
74}
75
76/// Create an unqualified column expression from the provided name, without normalizing
77/// the column.
78///
79/// For example:
80///
81/// ```rust
82/// # use datafusion_expr::{col, ident};
83/// let c1 = ident("A"); // not normalized staying as column 'A'
84/// let c2 = col("A"); // normalized via SQL rules becoming column 'a'
85/// assert_ne!(c1, c2);
86///
87/// let c3 = col(r#""A""#);
88/// assert_eq!(c1, c3);
89///
90/// let c4 = col("t1.a"); // parses as relation 't1' column 'a'
91/// let c5 = ident("t1.a"); // parses as column 't1.a'
92/// assert_ne!(c4, c5);
93/// ```
94pub fn ident(name: impl Into<String>) -> Expr {
95    Expr::Column(Column::from_name(name))
96}
97
98/// Create placeholder value that will be filled in (such as `$1`)
99///
100/// Note the parameter type can be inferred using [`Expr::infer_placeholder_types`]
101///
102/// # Example
103///
104/// ```rust
105/// # use datafusion_expr::{placeholder};
106/// let p = placeholder("$0"); // $0, refers to parameter 1
107/// assert_eq!(p.to_string(), "$0")
108/// ```
109pub fn placeholder(id: impl Into<String>) -> Expr {
110    Expr::Placeholder(Placeholder {
111        id: id.into(),
112        data_type: None,
113    })
114}
115
116/// Create an '*' [`Expr::Wildcard`] expression that matches all columns
117///
118/// # Example
119///
120/// ```rust
121/// # use datafusion_expr::{wildcard};
122/// let p = wildcard();
123/// assert_eq!(p.to_string(), "*")
124/// ```
125pub fn wildcard() -> SelectExpr {
126    SelectExpr::Wildcard(WildcardOptions::default())
127}
128
129/// Create an '*' [`Expr::Wildcard`] expression with the wildcard options
130pub fn wildcard_with_options(options: WildcardOptions) -> SelectExpr {
131    SelectExpr::Wildcard(options)
132}
133
134/// Create an 't.*' [`Expr::Wildcard`] expression that matches all columns from a specific table
135///
136/// # Example
137///
138/// ```rust
139/// # use datafusion_common::TableReference;
140/// # use datafusion_expr::{qualified_wildcard};
141/// let p = qualified_wildcard(TableReference::bare("t"));
142/// assert_eq!(p.to_string(), "t.*")
143/// ```
144pub fn qualified_wildcard(qualifier: impl Into<TableReference>) -> SelectExpr {
145    SelectExpr::QualifiedWildcard(qualifier.into(), WildcardOptions::default())
146}
147
148/// Create an 't.*' [`Expr::Wildcard`] expression with the wildcard options
149pub fn qualified_wildcard_with_options(
150    qualifier: impl Into<TableReference>,
151    options: WildcardOptions,
152) -> SelectExpr {
153    SelectExpr::QualifiedWildcard(qualifier.into(), options)
154}
155
156/// Return a new expression `left <op> right`
157pub fn binary_expr(left: Expr, op: Operator, right: Expr) -> Expr {
158    Expr::BinaryExpr(BinaryExpr::new(Box::new(left), op, Box::new(right)))
159}
160
161/// Return a new expression with a logical AND
162pub fn and(left: Expr, right: Expr) -> Expr {
163    Expr::BinaryExpr(BinaryExpr::new(
164        Box::new(left),
165        Operator::And,
166        Box::new(right),
167    ))
168}
169
170/// Return a new expression with a logical OR
171pub fn or(left: Expr, right: Expr) -> Expr {
172    Expr::BinaryExpr(BinaryExpr::new(
173        Box::new(left),
174        Operator::Or,
175        Box::new(right),
176    ))
177}
178
179/// Return a new expression with a logical NOT
180pub fn not(expr: Expr) -> Expr {
181    expr.not()
182}
183
184/// Return a new expression with bitwise AND
185pub fn bitwise_and(left: Expr, right: Expr) -> Expr {
186    Expr::BinaryExpr(BinaryExpr::new(
187        Box::new(left),
188        Operator::BitwiseAnd,
189        Box::new(right),
190    ))
191}
192
193/// Return a new expression with bitwise OR
194pub fn bitwise_or(left: Expr, right: Expr) -> Expr {
195    Expr::BinaryExpr(BinaryExpr::new(
196        Box::new(left),
197        Operator::BitwiseOr,
198        Box::new(right),
199    ))
200}
201
202/// Return a new expression with bitwise XOR
203pub fn bitwise_xor(left: Expr, right: Expr) -> Expr {
204    Expr::BinaryExpr(BinaryExpr::new(
205        Box::new(left),
206        Operator::BitwiseXor,
207        Box::new(right),
208    ))
209}
210
211/// Return a new expression with bitwise SHIFT RIGHT
212pub fn bitwise_shift_right(left: Expr, right: Expr) -> Expr {
213    Expr::BinaryExpr(BinaryExpr::new(
214        Box::new(left),
215        Operator::BitwiseShiftRight,
216        Box::new(right),
217    ))
218}
219
220/// Return a new expression with bitwise SHIFT LEFT
221pub fn bitwise_shift_left(left: Expr, right: Expr) -> Expr {
222    Expr::BinaryExpr(BinaryExpr::new(
223        Box::new(left),
224        Operator::BitwiseShiftLeft,
225        Box::new(right),
226    ))
227}
228
229/// Create an in_list expression
230pub fn in_list(expr: Expr, list: Vec<Expr>, negated: bool) -> Expr {
231    Expr::InList(InList::new(Box::new(expr), list, negated))
232}
233
234/// Create an EXISTS subquery expression
235pub fn exists(subquery: Arc<LogicalPlan>) -> Expr {
236    let outer_ref_columns = subquery.all_out_ref_exprs();
237    Expr::Exists(Exists {
238        subquery: Subquery {
239            subquery,
240            outer_ref_columns,
241            spans: Spans::new(),
242        },
243        negated: false,
244    })
245}
246
247/// Create a NOT EXISTS subquery expression
248pub fn not_exists(subquery: Arc<LogicalPlan>) -> Expr {
249    let outer_ref_columns = subquery.all_out_ref_exprs();
250    Expr::Exists(Exists {
251        subquery: Subquery {
252            subquery,
253            outer_ref_columns,
254            spans: Spans::new(),
255        },
256        negated: true,
257    })
258}
259
260/// Create an IN subquery expression
261pub fn in_subquery(expr: Expr, subquery: Arc<LogicalPlan>) -> Expr {
262    let outer_ref_columns = subquery.all_out_ref_exprs();
263    Expr::InSubquery(InSubquery::new(
264        Box::new(expr),
265        Subquery {
266            subquery,
267            outer_ref_columns,
268            spans: Spans::new(),
269        },
270        false,
271    ))
272}
273
274/// Create a NOT IN subquery expression
275pub fn not_in_subquery(expr: Expr, subquery: Arc<LogicalPlan>) -> Expr {
276    let outer_ref_columns = subquery.all_out_ref_exprs();
277    Expr::InSubquery(InSubquery::new(
278        Box::new(expr),
279        Subquery {
280            subquery,
281            outer_ref_columns,
282            spans: Spans::new(),
283        },
284        true,
285    ))
286}
287
288/// Create a scalar subquery expression
289pub fn scalar_subquery(subquery: Arc<LogicalPlan>) -> Expr {
290    let outer_ref_columns = subquery.all_out_ref_exprs();
291    Expr::ScalarSubquery(Subquery {
292        subquery,
293        outer_ref_columns,
294        spans: Spans::new(),
295    })
296}
297
298/// Create a grouping set
299pub fn grouping_set(exprs: Vec<Vec<Expr>>) -> Expr {
300    Expr::GroupingSet(GroupingSet::GroupingSets(exprs))
301}
302
303/// Create a grouping set for all combination of `exprs`
304pub fn cube(exprs: Vec<Expr>) -> Expr {
305    Expr::GroupingSet(GroupingSet::Cube(exprs))
306}
307
308/// Create a grouping set for rollup
309pub fn rollup(exprs: Vec<Expr>) -> Expr {
310    Expr::GroupingSet(GroupingSet::Rollup(exprs))
311}
312
313/// Create a cast expression
314pub fn cast(expr: Expr, data_type: DataType) -> Expr {
315    Expr::Cast(Cast::new(Box::new(expr), data_type))
316}
317
318/// Create a try cast expression
319pub fn try_cast(expr: Expr, data_type: DataType) -> Expr {
320    Expr::TryCast(TryCast::new(Box::new(expr), data_type))
321}
322
323/// Create is null expression
324pub fn is_null(expr: Expr) -> Expr {
325    Expr::IsNull(Box::new(expr))
326}
327
328/// Create is true expression
329pub fn is_true(expr: Expr) -> Expr {
330    Expr::IsTrue(Box::new(expr))
331}
332
333/// Create is not true expression
334pub fn is_not_true(expr: Expr) -> Expr {
335    Expr::IsNotTrue(Box::new(expr))
336}
337
338/// Create is false expression
339pub fn is_false(expr: Expr) -> Expr {
340    Expr::IsFalse(Box::new(expr))
341}
342
343/// Create is not false expression
344pub fn is_not_false(expr: Expr) -> Expr {
345    Expr::IsNotFalse(Box::new(expr))
346}
347
348/// Create is unknown expression
349pub fn is_unknown(expr: Expr) -> Expr {
350    Expr::IsUnknown(Box::new(expr))
351}
352
353/// Create is not unknown expression
354pub fn is_not_unknown(expr: Expr) -> Expr {
355    Expr::IsNotUnknown(Box::new(expr))
356}
357
358/// Create a CASE WHEN statement with literal WHEN expressions for comparison to the base expression.
359pub fn case(expr: Expr) -> CaseBuilder {
360    CaseBuilder::new(Some(Box::new(expr)), vec![], vec![], None)
361}
362
363/// Create a CASE WHEN statement with boolean WHEN expressions and no base expression.
364pub fn when(when: Expr, then: Expr) -> CaseBuilder {
365    CaseBuilder::new(None, vec![when], vec![then], None)
366}
367
368/// Create a Unnest expression
369pub fn unnest(expr: Expr) -> Expr {
370    Expr::Unnest(Unnest {
371        expr: Box::new(expr),
372    })
373}
374
375/// Convenience method to create a new user defined scalar function (UDF) with a
376/// specific signature and specific return type.
377///
378/// Note this function does not expose all available features of [`ScalarUDF`],
379/// such as
380///
381/// * computing return types based on input types
382/// * multiple [`Signature`]s
383/// * aliases
384///
385/// See [`ScalarUDF`] for details and examples on how to use the full
386/// functionality.
387pub fn create_udf(
388    name: &str,
389    input_types: Vec<DataType>,
390    return_type: DataType,
391    volatility: Volatility,
392    fun: ScalarFunctionImplementation,
393) -> ScalarUDF {
394    ScalarUDF::from(SimpleScalarUDF::new(
395        name,
396        input_types,
397        return_type,
398        volatility,
399        fun,
400    ))
401}
402
403/// Implements [`ScalarUDFImpl`] for functions that have a single signature and
404/// return type.
405pub struct SimpleScalarUDF {
406    name: String,
407    signature: Signature,
408    return_type: DataType,
409    fun: ScalarFunctionImplementation,
410}
411
412impl Debug for SimpleScalarUDF {
413    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
414        f.debug_struct("SimpleScalarUDF")
415            .field("name", &self.name)
416            .field("signature", &self.signature)
417            .field("return_type", &self.return_type)
418            .field("fun", &"<FUNC>")
419            .finish()
420    }
421}
422
423impl SimpleScalarUDF {
424    /// Create a new `SimpleScalarUDF` from a name, input types, return type and
425    /// implementation. Implementing [`ScalarUDFImpl`] allows more flexibility
426    pub fn new(
427        name: impl Into<String>,
428        input_types: Vec<DataType>,
429        return_type: DataType,
430        volatility: Volatility,
431        fun: ScalarFunctionImplementation,
432    ) -> Self {
433        Self::new_with_signature(
434            name,
435            Signature::exact(input_types, volatility),
436            return_type,
437            fun,
438        )
439    }
440
441    /// Create a new `SimpleScalarUDF` from a name, signature, return type and
442    /// implementation. Implementing [`ScalarUDFImpl`] allows more flexibility
443    pub fn new_with_signature(
444        name: impl Into<String>,
445        signature: Signature,
446        return_type: DataType,
447        fun: ScalarFunctionImplementation,
448    ) -> Self {
449        Self {
450            name: name.into(),
451            signature,
452            return_type,
453            fun,
454        }
455    }
456}
457
458impl ScalarUDFImpl for SimpleScalarUDF {
459    fn as_any(&self) -> &dyn Any {
460        self
461    }
462
463    fn name(&self) -> &str {
464        &self.name
465    }
466
467    fn signature(&self) -> &Signature {
468        &self.signature
469    }
470
471    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
472        Ok(self.return_type.clone())
473    }
474
475    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
476        (self.fun)(&args.args)
477    }
478
479    fn equals(&self, other: &dyn ScalarUDFImpl) -> bool {
480        let Some(other) = other.as_any().downcast_ref::<Self>() else {
481            return false;
482        };
483        let Self {
484            name,
485            signature,
486            return_type,
487            fun,
488        } = self;
489        name == &other.name
490            && signature == &other.signature
491            && return_type == &other.return_type
492            && Arc::ptr_eq(fun, &other.fun)
493    }
494
495    fn hash_value(&self) -> u64 {
496        let Self {
497            name,
498            signature,
499            return_type,
500            fun,
501        } = self;
502        let mut hasher = DefaultHasher::new();
503        std::any::type_name::<Self>().hash(&mut hasher);
504        name.hash(&mut hasher);
505        signature.hash(&mut hasher);
506        return_type.hash(&mut hasher);
507        Arc::as_ptr(fun).hash(&mut hasher);
508        hasher.finish()
509    }
510}
511
512/// Creates a new UDAF with a specific signature, state type and return type.
513/// The signature and state type must match the `Accumulator's implementation`.
514pub fn create_udaf(
515    name: &str,
516    input_type: Vec<DataType>,
517    return_type: Arc<DataType>,
518    volatility: Volatility,
519    accumulator: AccumulatorFactoryFunction,
520    state_type: Arc<Vec<DataType>>,
521) -> AggregateUDF {
522    let return_type = Arc::unwrap_or_clone(return_type);
523    let state_type = Arc::unwrap_or_clone(state_type);
524    let state_fields = state_type
525        .into_iter()
526        .enumerate()
527        .map(|(i, t)| Field::new(format!("{i}"), t, true))
528        .map(Arc::new)
529        .collect::<Vec<_>>();
530    AggregateUDF::from(SimpleAggregateUDF::new(
531        name,
532        input_type,
533        return_type,
534        volatility,
535        accumulator,
536        state_fields,
537    ))
538}
539
540/// Implements [`AggregateUDFImpl`] for functions that have a single signature and
541/// return type.
542pub struct SimpleAggregateUDF {
543    name: String,
544    signature: Signature,
545    return_type: DataType,
546    accumulator: AccumulatorFactoryFunction,
547    state_fields: Vec<FieldRef>,
548}
549
550impl Debug for SimpleAggregateUDF {
551    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
552        f.debug_struct("SimpleAggregateUDF")
553            .field("name", &self.name)
554            .field("signature", &self.signature)
555            .field("return_type", &self.return_type)
556            .field("fun", &"<FUNC>")
557            .finish()
558    }
559}
560
561impl SimpleAggregateUDF {
562    /// Create a new `SimpleAggregateUDF` from a name, input types, return type, state type and
563    /// implementation. Implementing [`AggregateUDFImpl`] allows more flexibility
564    pub fn new(
565        name: impl Into<String>,
566        input_type: Vec<DataType>,
567        return_type: DataType,
568        volatility: Volatility,
569        accumulator: AccumulatorFactoryFunction,
570        state_fields: Vec<FieldRef>,
571    ) -> Self {
572        let name = name.into();
573        let signature = Signature::exact(input_type, volatility);
574        Self {
575            name,
576            signature,
577            return_type,
578            accumulator,
579            state_fields,
580        }
581    }
582
583    /// Create a new `SimpleAggregateUDF` from a name, signature, return type, state type and
584    /// implementation. Implementing [`AggregateUDFImpl`] allows more flexibility
585    pub fn new_with_signature(
586        name: impl Into<String>,
587        signature: Signature,
588        return_type: DataType,
589        accumulator: AccumulatorFactoryFunction,
590        state_fields: Vec<FieldRef>,
591    ) -> Self {
592        let name = name.into();
593        Self {
594            name,
595            signature,
596            return_type,
597            accumulator,
598            state_fields,
599        }
600    }
601}
602
603impl AggregateUDFImpl for SimpleAggregateUDF {
604    fn as_any(&self) -> &dyn Any {
605        self
606    }
607
608    fn name(&self) -> &str {
609        &self.name
610    }
611
612    fn signature(&self) -> &Signature {
613        &self.signature
614    }
615
616    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
617        Ok(self.return_type.clone())
618    }
619
620    fn accumulator(
621        &self,
622        acc_args: AccumulatorArgs,
623    ) -> Result<Box<dyn crate::Accumulator>> {
624        (self.accumulator)(acc_args)
625    }
626
627    fn state_fields(&self, _args: StateFieldsArgs) -> Result<Vec<FieldRef>> {
628        Ok(self.state_fields.clone())
629    }
630
631    fn equals(&self, other: &dyn AggregateUDFImpl) -> bool {
632        let Some(other) = other.as_any().downcast_ref::<Self>() else {
633            return false;
634        };
635        let Self {
636            name,
637            signature,
638            return_type,
639            accumulator,
640            state_fields,
641        } = self;
642        name == &other.name
643            && signature == &other.signature
644            && return_type == &other.return_type
645            && Arc::ptr_eq(accumulator, &other.accumulator)
646            && state_fields == &other.state_fields
647    }
648
649    fn hash_value(&self) -> u64 {
650        let Self {
651            name,
652            signature,
653            return_type,
654            accumulator,
655            state_fields,
656        } = self;
657        let mut hasher = DefaultHasher::new();
658        std::any::type_name::<Self>().hash(&mut hasher);
659        name.hash(&mut hasher);
660        signature.hash(&mut hasher);
661        return_type.hash(&mut hasher);
662        Arc::as_ptr(accumulator).hash(&mut hasher);
663        state_fields.hash(&mut hasher);
664        hasher.finish()
665    }
666}
667
668/// Creates a new UDWF with a specific signature, state type and return type.
669///
670/// The signature and state type must match the [`PartitionEvaluator`]'s implementation`.
671///
672/// [`PartitionEvaluator`]: crate::PartitionEvaluator
673pub fn create_udwf(
674    name: &str,
675    input_type: DataType,
676    return_type: Arc<DataType>,
677    volatility: Volatility,
678    partition_evaluator_factory: PartitionEvaluatorFactory,
679) -> WindowUDF {
680    let return_type = Arc::unwrap_or_clone(return_type);
681    WindowUDF::from(SimpleWindowUDF::new(
682        name,
683        input_type,
684        return_type,
685        volatility,
686        partition_evaluator_factory,
687    ))
688}
689
690/// Implements [`WindowUDFImpl`] for functions that have a single signature and
691/// return type.
692pub struct SimpleWindowUDF {
693    name: String,
694    signature: Signature,
695    return_type: DataType,
696    partition_evaluator_factory: PartitionEvaluatorFactory,
697}
698
699impl Debug for SimpleWindowUDF {
700    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
701        f.debug_struct("WindowUDF")
702            .field("name", &self.name)
703            .field("signature", &self.signature)
704            .field("return_type", &"<func>")
705            .field("partition_evaluator_factory", &"<FUNC>")
706            .finish()
707    }
708}
709
710impl SimpleWindowUDF {
711    /// Create a new `SimpleWindowUDF` from a name, input types, return type and
712    /// implementation. Implementing [`WindowUDFImpl`] allows more flexibility
713    pub fn new(
714        name: impl Into<String>,
715        input_type: DataType,
716        return_type: DataType,
717        volatility: Volatility,
718        partition_evaluator_factory: PartitionEvaluatorFactory,
719    ) -> Self {
720        let name = name.into();
721        let signature = Signature::exact([input_type].to_vec(), volatility);
722        Self {
723            name,
724            signature,
725            return_type,
726            partition_evaluator_factory,
727        }
728    }
729}
730
731impl WindowUDFImpl for SimpleWindowUDF {
732    fn as_any(&self) -> &dyn Any {
733        self
734    }
735
736    fn name(&self) -> &str {
737        &self.name
738    }
739
740    fn signature(&self) -> &Signature {
741        &self.signature
742    }
743
744    fn partition_evaluator(
745        &self,
746        _partition_evaluator_args: PartitionEvaluatorArgs,
747    ) -> Result<Box<dyn PartitionEvaluator>> {
748        (self.partition_evaluator_factory)()
749    }
750
751    fn field(&self, field_args: WindowUDFFieldArgs) -> Result<FieldRef> {
752        Ok(Arc::new(Field::new(
753            field_args.name(),
754            self.return_type.clone(),
755            true,
756        )))
757    }
758
759    fn equals(&self, other: &dyn WindowUDFImpl) -> bool {
760        let Some(other) = other.as_any().downcast_ref::<Self>() else {
761            return false;
762        };
763        let Self {
764            name,
765            signature,
766            return_type,
767            partition_evaluator_factory,
768        } = self;
769        name == &other.name
770            && signature == &other.signature
771            && return_type == &other.return_type
772            && Arc::ptr_eq(
773                partition_evaluator_factory,
774                &other.partition_evaluator_factory,
775            )
776    }
777
778    fn hash_value(&self) -> u64 {
779        let Self {
780            name,
781            signature,
782            return_type,
783            partition_evaluator_factory,
784        } = self;
785        let mut hasher = DefaultHasher::new();
786        std::any::type_name::<Self>().hash(&mut hasher);
787        name.hash(&mut hasher);
788        signature.hash(&mut hasher);
789        return_type.hash(&mut hasher);
790        Arc::as_ptr(partition_evaluator_factory).hash(&mut hasher);
791        hasher.finish()
792    }
793}
794
795pub fn interval_year_month_lit(value: &str) -> Expr {
796    let interval = parse_interval_year_month(value).ok();
797    Expr::Literal(ScalarValue::IntervalYearMonth(interval), None)
798}
799
800pub fn interval_datetime_lit(value: &str) -> Expr {
801    let interval = parse_interval_day_time(value).ok();
802    Expr::Literal(ScalarValue::IntervalDayTime(interval), None)
803}
804
805pub fn interval_month_day_nano_lit(value: &str) -> Expr {
806    let interval = parse_interval_month_day_nano(value).ok();
807    Expr::Literal(ScalarValue::IntervalMonthDayNano(interval), None)
808}
809
810/// Extensions for configuring [`Expr::AggregateFunction`] or [`Expr::WindowFunction`]
811///
812/// Adds methods to [`Expr`] that make it easy to set optional options
813/// such as `ORDER BY`, `FILTER` and `DISTINCT`
814///
815/// # Example
816/// ```no_run
817/// # use datafusion_common::Result;
818/// # use datafusion_expr::test::function_stub::count;
819/// # use sqlparser::ast::NullTreatment;
820/// # use datafusion_expr::{ExprFunctionExt, lit, Expr, col};
821/// # // first_value is an aggregate function in another crate
822/// # fn first_value(_arg: Expr) -> Expr {
823/// unimplemented!() }
824/// # fn main() -> Result<()> {
825/// // Create an aggregate count, filtering on column y > 5
826/// let agg = count(col("x")).filter(col("y").gt(lit(5))).build()?;
827///
828/// // Find the first value in an aggregate sorted by column y
829/// // equivalent to:
830/// // `FIRST_VALUE(x ORDER BY y ASC IGNORE NULLS)`
831/// let sort_expr = col("y").sort(true, true);
832/// let agg = first_value(col("x"))
833///     .order_by(vec![sort_expr])
834///     .null_treatment(NullTreatment::IgnoreNulls)
835///     .build()?;
836///
837/// // Create a window expression for percent rank partitioned on column a
838/// // equivalent to:
839/// // `PERCENT_RANK() OVER (PARTITION BY a ORDER BY b ASC NULLS LAST IGNORE NULLS)`
840/// // percent_rank is an udwf function in another crate
841/// # fn percent_rank() -> Expr {
842/// unimplemented!() }
843/// let window = percent_rank()
844///     .partition_by(vec![col("a")])
845///     .order_by(vec![col("b").sort(true, true)])
846///     .null_treatment(NullTreatment::IgnoreNulls)
847///     .build()?;
848/// #     Ok(())
849/// # }
850/// ```
851pub trait ExprFunctionExt {
852    /// Add `ORDER BY <order_by>`
853    fn order_by(self, order_by: Vec<Sort>) -> ExprFuncBuilder;
854    /// Add `FILTER <filter>`
855    fn filter(self, filter: Expr) -> ExprFuncBuilder;
856    /// Add `DISTINCT`
857    fn distinct(self) -> ExprFuncBuilder;
858    /// Add `RESPECT NULLS` or `IGNORE NULLS`
859    fn null_treatment(
860        self,
861        null_treatment: impl Into<Option<NullTreatment>>,
862    ) -> ExprFuncBuilder;
863    /// Add `PARTITION BY`
864    fn partition_by(self, partition_by: Vec<Expr>) -> ExprFuncBuilder;
865    /// Add appropriate window frame conditions
866    fn window_frame(self, window_frame: WindowFrame) -> ExprFuncBuilder;
867}
868
869#[derive(Debug, Clone)]
870pub enum ExprFuncKind {
871    Aggregate(AggregateFunction),
872    Window(WindowFunction),
873}
874
875/// Implementation of [`ExprFunctionExt`].
876///
877/// See [`ExprFunctionExt`] for usage and examples
878#[derive(Debug, Clone)]
879pub struct ExprFuncBuilder {
880    fun: Option<ExprFuncKind>,
881    order_by: Option<Vec<Sort>>,
882    filter: Option<Expr>,
883    distinct: bool,
884    null_treatment: Option<NullTreatment>,
885    partition_by: Option<Vec<Expr>>,
886    window_frame: Option<WindowFrame>,
887}
888
889impl ExprFuncBuilder {
890    /// Create a new `ExprFuncBuilder`, see [`ExprFunctionExt`]
891    fn new(fun: Option<ExprFuncKind>) -> Self {
892        Self {
893            fun,
894            order_by: None,
895            filter: None,
896            distinct: false,
897            null_treatment: None,
898            partition_by: None,
899            window_frame: None,
900        }
901    }
902
903    /// Updates and returns the in progress [`Expr::AggregateFunction`] or [`Expr::WindowFunction`]
904    ///
905    /// # Errors:
906    ///
907    /// Returns an error if this builder  [`ExprFunctionExt`] was used with an
908    /// `Expr` variant other than [`Expr::AggregateFunction`] or [`Expr::WindowFunction`]
909    pub fn build(self) -> Result<Expr> {
910        let Self {
911            fun,
912            order_by,
913            filter,
914            distinct,
915            null_treatment,
916            partition_by,
917            window_frame,
918        } = self;
919
920        let Some(fun) = fun else {
921            return plan_err!(
922                "ExprFunctionExt can only be used with Expr::AggregateFunction or Expr::WindowFunction"
923            );
924        };
925
926        let fun_expr = match fun {
927            ExprFuncKind::Aggregate(mut udaf) => {
928                udaf.params.order_by = order_by.unwrap_or_default();
929                udaf.params.filter = filter.map(Box::new);
930                udaf.params.distinct = distinct;
931                udaf.params.null_treatment = null_treatment;
932                Expr::AggregateFunction(udaf)
933            }
934            ExprFuncKind::Window(WindowFunction {
935                fun,
936                params: WindowFunctionParams { args, .. },
937            }) => {
938                let has_order_by = order_by.as_ref().map(|o| !o.is_empty());
939                Expr::from(WindowFunction {
940                    fun,
941                    params: WindowFunctionParams {
942                        args,
943                        partition_by: partition_by.unwrap_or_default(),
944                        order_by: order_by.unwrap_or_default(),
945                        window_frame: window_frame
946                            .unwrap_or_else(|| WindowFrame::new(has_order_by)),
947                        null_treatment,
948                    },
949                })
950            }
951        };
952
953        Ok(fun_expr)
954    }
955}
956
957impl ExprFunctionExt for ExprFuncBuilder {
958    /// Add `ORDER BY <order_by>`
959    fn order_by(mut self, order_by: Vec<Sort>) -> ExprFuncBuilder {
960        self.order_by = Some(order_by);
961        self
962    }
963
964    /// Add `FILTER <filter>`
965    fn filter(mut self, filter: Expr) -> ExprFuncBuilder {
966        self.filter = Some(filter);
967        self
968    }
969
970    /// Add `DISTINCT`
971    fn distinct(mut self) -> ExprFuncBuilder {
972        self.distinct = true;
973        self
974    }
975
976    /// Add `RESPECT NULLS` or `IGNORE NULLS`
977    fn null_treatment(
978        mut self,
979        null_treatment: impl Into<Option<NullTreatment>>,
980    ) -> ExprFuncBuilder {
981        self.null_treatment = null_treatment.into();
982        self
983    }
984
985    fn partition_by(mut self, partition_by: Vec<Expr>) -> ExprFuncBuilder {
986        self.partition_by = Some(partition_by);
987        self
988    }
989
990    fn window_frame(mut self, window_frame: WindowFrame) -> ExprFuncBuilder {
991        self.window_frame = Some(window_frame);
992        self
993    }
994}
995
996impl ExprFunctionExt for Expr {
997    fn order_by(self, order_by: Vec<Sort>) -> ExprFuncBuilder {
998        let mut builder = match self {
999            Expr::AggregateFunction(udaf) => {
1000                ExprFuncBuilder::new(Some(ExprFuncKind::Aggregate(udaf)))
1001            }
1002            Expr::WindowFunction(udwf) => {
1003                ExprFuncBuilder::new(Some(ExprFuncKind::Window(*udwf)))
1004            }
1005            _ => ExprFuncBuilder::new(None),
1006        };
1007        if builder.fun.is_some() {
1008            builder.order_by = Some(order_by);
1009        }
1010        builder
1011    }
1012    fn filter(self, filter: Expr) -> ExprFuncBuilder {
1013        match self {
1014            Expr::AggregateFunction(udaf) => {
1015                let mut builder =
1016                    ExprFuncBuilder::new(Some(ExprFuncKind::Aggregate(udaf)));
1017                builder.filter = Some(filter);
1018                builder
1019            }
1020            _ => ExprFuncBuilder::new(None),
1021        }
1022    }
1023    fn distinct(self) -> ExprFuncBuilder {
1024        match self {
1025            Expr::AggregateFunction(udaf) => {
1026                let mut builder =
1027                    ExprFuncBuilder::new(Some(ExprFuncKind::Aggregate(udaf)));
1028                builder.distinct = true;
1029                builder
1030            }
1031            _ => ExprFuncBuilder::new(None),
1032        }
1033    }
1034    fn null_treatment(
1035        self,
1036        null_treatment: impl Into<Option<NullTreatment>>,
1037    ) -> ExprFuncBuilder {
1038        let mut builder = match self {
1039            Expr::AggregateFunction(udaf) => {
1040                ExprFuncBuilder::new(Some(ExprFuncKind::Aggregate(udaf)))
1041            }
1042            Expr::WindowFunction(udwf) => {
1043                ExprFuncBuilder::new(Some(ExprFuncKind::Window(*udwf)))
1044            }
1045            _ => ExprFuncBuilder::new(None),
1046        };
1047        if builder.fun.is_some() {
1048            builder.null_treatment = null_treatment.into();
1049        }
1050        builder
1051    }
1052
1053    fn partition_by(self, partition_by: Vec<Expr>) -> ExprFuncBuilder {
1054        match self {
1055            Expr::WindowFunction(udwf) => {
1056                let mut builder = ExprFuncBuilder::new(Some(ExprFuncKind::Window(*udwf)));
1057                builder.partition_by = Some(partition_by);
1058                builder
1059            }
1060            _ => ExprFuncBuilder::new(None),
1061        }
1062    }
1063
1064    fn window_frame(self, window_frame: WindowFrame) -> ExprFuncBuilder {
1065        match self {
1066            Expr::WindowFunction(udwf) => {
1067                let mut builder = ExprFuncBuilder::new(Some(ExprFuncKind::Window(*udwf)));
1068                builder.window_frame = Some(window_frame);
1069                builder
1070            }
1071            _ => ExprFuncBuilder::new(None),
1072        }
1073    }
1074}
1075
1076#[cfg(test)]
1077mod test {
1078    use super::*;
1079
1080    #[test]
1081    fn filter_is_null_and_is_not_null() {
1082        let col_null = col("col1");
1083        let col_not_null = ident("col2");
1084        assert_eq!(format!("{}", col_null.is_null()), "col1 IS NULL");
1085        assert_eq!(
1086            format!("{}", col_not_null.is_not_null()),
1087            "col2 IS NOT NULL"
1088        );
1089    }
1090}