polars_plan/plans/
options.rs

1use bitflags::bitflags;
2use polars_core::prelude::*;
3use polars_core::utils::SuperTypeOptions;
4#[cfg(feature = "serde")]
5use serde::{Deserialize, Serialize};
6
7use crate::plans::PlSmallStr;
8
9#[derive(Clone, Debug, Eq, PartialEq, Hash)]
10#[cfg_attr(feature = "ir_serde", derive(Serialize, Deserialize))]
11pub struct DistinctOptionsIR {
12    /// Subset of columns that will be taken into account.
13    pub subset: Option<Arc<[PlSmallStr]>>,
14    /// This will maintain the order of the input.
15    /// Note that this is more expensive.
16    /// `maintain_order` is not supported in the streaming
17    /// engine.
18    pub maintain_order: bool,
19    /// Which rows to keep.
20    pub keep_strategy: UniqueKeepStrategy,
21    /// Take only a slice of the result
22    pub slice: Option<(i64, usize)>,
23}
24
25// a boolean that can only be set to `false` safely
26#[derive(Clone, Copy, PartialEq, Eq, Debug, Hash)]
27#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
28#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
29pub struct UnsafeBool(bool);
30impl Default for UnsafeBool {
31    fn default() -> Self {
32        UnsafeBool(true)
33    }
34}
35
36#[cfg(feature = "dsl-schema")]
37impl schemars::JsonSchema for FunctionFlags {
38    fn schema_name() -> String {
39        "FunctionFlags".to_owned()
40    }
41
42    fn schema_id() -> std::borrow::Cow<'static, str> {
43        std::borrow::Cow::Borrowed(concat!(module_path!(), "::", "FunctionFlags"))
44    }
45
46    fn json_schema(_generator: &mut schemars::r#gen::SchemaGenerator) -> schemars::schema::Schema {
47        use serde_json::{Map, Value};
48
49        let name_to_bits: Map<String, Value> = Self::all()
50            .iter_names()
51            .map(|(name, flag)| (name.to_owned(), flag.bits().into()))
52            .collect();
53
54        schemars::schema::Schema::Object(schemars::schema::SchemaObject {
55            instance_type: Some(schemars::schema::InstanceType::String.into()),
56            format: Some("bitflags".to_owned()),
57            extensions: schemars::Map::from_iter([
58                // Add a map of flag names and bit patterns to detect schema changes
59                ("bitflags".to_owned(), Value::Object(name_to_bits)),
60            ]),
61            ..Default::default()
62        })
63    }
64}
65
66bitflags!(
67        #[repr(transparent)]
68        #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
69        #[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
70        pub struct FunctionFlags: u16 {
71            /// Raise if use in group by
72            const ALLOW_GROUP_AWARE = 1 << 0;
73            /// The physical expression may rename the output of this function.
74            /// If set to `false` the physical engine will ensure the left input
75            /// expression is the output name.
76            const ALLOW_RENAME = 1 << 2;
77            /// if set, then the `Series` passed to the function in the group_by operation
78            /// will ensure the name is set. This is an extra heap allocation per group.
79            const PASS_NAME_TO_APPLY = 1 << 3;
80            /// There can be two ways of expanding wildcards:
81            ///
82            /// Say the schema is 'a', 'b' and there is a function `f`. In this case, `f('*')` can expand
83            /// to:
84            /// 1. `f('a', 'b')`
85            /// 2. `f('a'), f('b')`
86            ///
87            /// Setting this to true, will lead to behavior 1.
88            ///
89            /// This also accounts for regex expansion.
90            const INPUT_WILDCARD_EXPANSION = 1 << 4;
91            /// Automatically explode on unit length if it ran as final aggregation.
92            ///
93            /// this is the case for aggregations like sum, min, covariance etc.
94            /// We need to know this because we cannot see the difference between
95            /// the following functions based on the output type and number of elements:
96            ///
97            /// x: {1, 2, 3}
98            ///
99            /// head_1(x) -> {1}
100            /// sum(x) -> {4}
101            ///
102            /// mutually exclusive with `RETURNS_SCALAR`
103            const RETURNS_SCALAR = 1 << 5;
104            /// This can happen with UDF's that use Polars within the UDF.
105            /// This can lead to recursively entering the engine and sometimes deadlocks.
106            /// This flag must be set to handle that.
107            const OPTIONAL_RE_ENTRANT = 1 << 6;
108            /// Whether this function allows no inputs.
109            const ALLOW_EMPTY_INPUTS = 1 << 7;
110
111            /// Given a function f and a column of values [v1, ..., vn]
112            /// f is row-separable i.f.f.
113            /// f([v1, ..., vn]) = concat(f(v1, ... vm), f(vm+1, ..., vn))
114            const ROW_SEPARABLE = 1 << 8;
115            /// Given a function f and a column of values [v1, ..., vn]
116            /// f is length preserving i.f.f. len(f([v1, ..., vn])) = n
117            ///
118            /// mutually exclusive with `RETURNS_SCALAR`
119            const LENGTH_PRESERVING = 1 << 9;
120            /// Aggregate the values of the expression into a list before applying the function.
121            const APPLY_LIST = 1 << 10;
122        }
123);
124
125impl FunctionFlags {
126    pub fn set_elementwise(&mut self) {
127        *self |= Self::ROW_SEPARABLE | Self::LENGTH_PRESERVING;
128    }
129
130    pub fn is_elementwise(self) -> bool {
131        self.contains(Self::ROW_SEPARABLE | Self::LENGTH_PRESERVING)
132    }
133
134    pub fn returns_scalar(self) -> bool {
135        self.contains(Self::RETURNS_SCALAR)
136    }
137}
138
139impl Default for FunctionFlags {
140    fn default() -> Self {
141        Self::from_bits_truncate(0) | Self::ALLOW_GROUP_AWARE
142    }
143}
144
145#[derive(Clone, Copy, PartialEq, Eq, Debug, Hash)]
146pub enum CastingRules {
147    /// Whether information may be lost during cast. E.g. a float to int is considered lossy,
148    /// whereas int to int is considered lossless.
149    /// Overflowing is not considered in this flag, that's handled in `strict` casting
150    FirstArgLossless,
151    Supertype(SuperTypeOptions),
152}
153
154impl CastingRules {
155    pub fn cast_to_supertypes() -> CastingRules {
156        Self::Supertype(Default::default())
157    }
158}
159
160#[derive(Clone, Copy, PartialEq, Eq, Debug, Hash)]
161#[cfg_attr(any(feature = "serde"), derive(Serialize, Deserialize))]
162#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
163pub struct FunctionOptions {
164    // Validate the output of a `map`.
165    // this should always be true or we could OOB
166    pub check_lengths: UnsafeBool,
167    pub flags: FunctionFlags,
168
169    /// Options used when deciding how to cast the arguments of the function.
170    #[cfg_attr(any(feature = "serde", feature = "dsl-schema"), serde(skip))]
171    pub cast_options: Option<CastingRules>,
172}
173
174impl FunctionOptions {
175    #[cfg(feature = "fused")]
176    pub(crate) unsafe fn no_check_lengths(&mut self) {
177        self.check_lengths = UnsafeBool(false);
178    }
179    pub fn check_lengths(&self) -> bool {
180        self.check_lengths.0
181    }
182
183    pub fn set_elementwise(&mut self) {
184        self.flags.set_elementwise();
185    }
186
187    pub fn is_elementwise(&self) -> bool {
188        self.flags.is_elementwise()
189    }
190
191    pub fn is_length_preserving(&self) -> bool {
192        self.flags.contains(FunctionFlags::LENGTH_PRESERVING)
193    }
194
195    pub fn returns_scalar(&self) -> bool {
196        self.flags.returns_scalar()
197    }
198
199    pub fn elementwise() -> FunctionOptions {
200        FunctionOptions {
201            ..Default::default()
202        }
203        .with_flags(|f| f | FunctionFlags::ROW_SEPARABLE | FunctionFlags::LENGTH_PRESERVING)
204    }
205
206    pub fn elementwise_with_infer() -> FunctionOptions {
207        Self::length_preserving()
208    }
209
210    pub fn row_separable() -> FunctionOptions {
211        FunctionOptions {
212            ..Default::default()
213        }
214        .with_flags(|f| f | FunctionFlags::ROW_SEPARABLE)
215    }
216
217    pub fn length_preserving() -> FunctionOptions {
218        FunctionOptions {
219            ..Default::default()
220        }
221        .with_flags(|f| f | FunctionFlags::LENGTH_PRESERVING)
222    }
223
224    pub fn groupwise() -> FunctionOptions {
225        FunctionOptions {
226            ..Default::default()
227        }
228    }
229
230    pub fn aggregation() -> FunctionOptions {
231        let mut options = Self::groupwise();
232        options.flags |= FunctionFlags::RETURNS_SCALAR;
233        options
234    }
235
236    pub fn with_supertyping(self, supertype_options: SuperTypeOptions) -> FunctionOptions {
237        self.with_casting_rules(CastingRules::Supertype(supertype_options))
238    }
239
240    pub fn with_casting_rules(mut self, casting_rules: CastingRules) -> FunctionOptions {
241        self.cast_options = Some(casting_rules);
242        self
243    }
244
245    pub fn with_flags(mut self, f: impl Fn(FunctionFlags) -> FunctionFlags) -> FunctionOptions {
246        self.flags = f(self.flags);
247        self
248    }
249}
250
251impl Default for FunctionOptions {
252    fn default() -> Self {
253        FunctionOptions {
254            check_lengths: UnsafeBool(true),
255            cast_options: Default::default(),
256            flags: Default::default(),
257        }
258    }
259}
260
261#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
262#[cfg_attr(feature = "dsl-schema", derive(schemars::JsonSchema))]
263#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
264pub struct ProjectionOptions {
265    pub run_parallel: bool,
266    pub duplicate_check: bool,
267    // Should length-1 Series be broadcast to the length of the dataframe.
268    // Only used by CSE optimizer
269    pub should_broadcast: bool,
270}
271
272impl Default for ProjectionOptions {
273    fn default() -> Self {
274        Self {
275            run_parallel: true,
276            duplicate_check: true,
277            should_broadcast: true,
278        }
279    }
280}
281
282impl ProjectionOptions {
283    /// Conservatively merge the options of two [`ProjectionOptions`]
284    pub fn merge_options(&self, other: &Self) -> Self {
285        Self {
286            run_parallel: self.run_parallel & other.run_parallel,
287            duplicate_check: self.duplicate_check & other.duplicate_check,
288            should_broadcast: self.should_broadcast | other.should_broadcast,
289        }
290    }
291}