datafusion_spark/lib.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#![doc(
19 html_logo_url = "https://raw.githubusercontent.com/apache/datafusion/19fe44cf2f30cbdd63d4a4f52c74055163c6cc38/docs/logos/standalone_logo/logo_original.svg",
20 html_favicon_url = "https://raw.githubusercontent.com/apache/datafusion/19fe44cf2f30cbdd63d4a4f52c74055163c6cc38/docs/logos/standalone_logo/logo_original.svg"
21)]
22#![cfg_attr(docsrs, feature(doc_cfg))]
23// Make cheap clones clear: https://github.com/apache/datafusion/issues/11143
24#![deny(clippy::clone_on_ref_ptr)]
25#![cfg_attr(test, allow(clippy::needless_pass_by_value))]
26
27//! Spark Expression packages for [DataFusion].
28//!
29//! This crate contains a collection of various Spark function packages for DataFusion,
30//! implemented using the extension API.
31//!
32//! [DataFusion]: https://crates.io/crates/datafusion
33//!
34//!
35//! # Available Function Packages
36//! See the list of [modules](#modules) in this crate for available packages.
37//!
38//! # Example: using all function packages
39//!
40//! You can register all the functions in all packages using the [`register_all`]
41//! function as shown below. Any existing functions will be overwritten, with these
42//! Spark functions taking priority.
43//!
44//! ```
45//! # use datafusion_execution::FunctionRegistry;
46//! # use datafusion_expr::{ScalarUDF, AggregateUDF, WindowUDF};
47//! # use datafusion_expr::planner::ExprPlanner;
48//! # use datafusion_common::Result;
49//! # use std::collections::HashSet;
50//! # use std::sync::Arc;
51//! # // Note: We can't use a real SessionContext here because the
52//! # // `datafusion_spark` crate has no dependence on the DataFusion crate
53//! # // thus use a dummy SessionContext that has enough of the implementation
54//! # struct SessionContext {}
55//! # impl FunctionRegistry for SessionContext {
56//! # fn register_udf(&mut self, _udf: Arc<ScalarUDF>) -> Result<Option<Arc<ScalarUDF>>> { Ok (None) }
57//! # fn udfs(&self) -> HashSet<String> { unimplemented!() }
58//! # fn udafs(&self) -> HashSet<String> { unimplemented!() }
59//! # fn udwfs(&self) -> HashSet<String> { unimplemented!() }
60//! # fn udf(&self, _name: &str) -> Result<Arc<ScalarUDF>> { unimplemented!() }
61//! # fn udaf(&self, name: &str) -> Result<Arc<AggregateUDF>> {unimplemented!() }
62//! # fn udwf(&self, name: &str) -> Result<Arc<WindowUDF>> { unimplemented!() }
63//! # fn expr_planners(&self) -> Vec<Arc<dyn ExprPlanner>> { unimplemented!() }
64//! # }
65//! # impl SessionContext {
66//! # fn new() -> Self { SessionContext {} }
67//! # async fn sql(&mut self, _query: &str) -> Result<()> { Ok(()) }
68//! # }
69//! #
70//! # async fn stub() -> Result<()> {
71//! // Create a new session context
72//! let mut ctx = SessionContext::new();
73//! // Register all Spark functions with the context
74//! datafusion_spark::register_all(&mut ctx)?;
75//! // Run a query using the `sha2` function which is now available and has Spark semantics
76//! let df = ctx.sql("SELECT sha2('The input String', 256)").await?;
77//! # Ok(())
78//! # }
79//! ```
80//!
81//! # Example: calling a specific function in Rust
82//!
83//! Each package also exports an `expr_fn` submodule that create [`Expr`]s for
84//! invoking functions via rust using a fluent style. For example, to invoke the
85//! `sha2` function, you can use the following code:
86//!
87//! ```rust
88//! # use datafusion_expr::{col, lit};
89//! use datafusion_spark::expr_fn::sha2;
90//! // Create the expression `sha2(my_data, 256)`
91//! let expr = sha2(col("my_data"), lit(256));
92//! ```
93//!
94//! # Example: using the Spark expression planner
95//!
96//! The [`planner::SparkFunctionPlanner`] provides Spark-compatible expression
97//! planning, such as mapping SQL `EXTRACT` expressions to Spark's `date_part`
98//! function. To use it, register it with your session context:
99//!
100//! ```ignore
101//! use std::sync::Arc;
102//! use datafusion::prelude::SessionContext;
103//! use datafusion_spark::planner::SparkFunctionPlanner;
104//!
105//! let mut ctx = SessionContext::new();
106//! // Register the Spark expression planner
107//! ctx.register_expr_planner(Arc::new(SparkFunctionPlanner))?;
108//! // Now EXTRACT expressions will use Spark semantics
109//! let df = ctx.sql("SELECT EXTRACT(YEAR FROM timestamp_col) FROM my_table").await?;
110//! ```
111//!
112//![`Expr`]: datafusion_expr::Expr
113//!
114//! # Example: enabling Apache Spark features with SessionStateBuilder
115//!
116//! The recommended way to enable Apache Spark compatibility is to use the
117//! `SessionStateBuilderSpark` extension trait. This registers all
118//! Apache Spark functions (scalar, aggregate, window, and table) as well as the Apache Spark
119//! expression planner.
120//!
121//! Enable the `core` feature in your `Cargo.toml`:
122//! ```toml
123//! datafusion-spark = { version = "X", features = ["core"] }
124//! ```
125//!
126//! Then use the extension trait - see [`SessionStateBuilderSpark::with_spark_features`]
127//! for an example.
128
129pub mod function;
130pub mod planner;
131
132#[cfg(feature = "core")]
133mod session_state;
134
135#[cfg(feature = "core")]
136pub use session_state::SessionStateBuilderSpark;
137
138use datafusion_catalog::TableFunction;
139use datafusion_common::Result;
140use datafusion_execution::FunctionRegistry;
141use datafusion_expr::{AggregateUDF, ScalarUDF, WindowUDF};
142use log::debug;
143use std::sync::Arc;
144
145/// Fluent-style API for creating `Expr`s
146#[expect(unused_imports)]
147pub mod expr_fn {
148 pub use super::function::aggregate::expr_fn::*;
149 pub use super::function::array::expr_fn::*;
150 pub use super::function::bitmap::expr_fn::*;
151 pub use super::function::bitwise::expr_fn::*;
152 pub use super::function::collection::expr_fn::*;
153 pub use super::function::conditional::expr_fn::*;
154 pub use super::function::conversion::expr_fn::*;
155 pub use super::function::csv::expr_fn::*;
156 pub use super::function::datetime::expr_fn::*;
157 pub use super::function::generator::expr_fn::*;
158 pub use super::function::hash::expr_fn::*;
159 pub use super::function::json::expr_fn::*;
160 pub use super::function::lambda::expr_fn::*;
161 pub use super::function::map::expr_fn::*;
162 pub use super::function::math::expr_fn::*;
163 pub use super::function::misc::expr_fn::*;
164 pub use super::function::predicate::expr_fn::*;
165 pub use super::function::string::expr_fn::*;
166 pub use super::function::r#struct::expr_fn::*;
167 pub use super::function::table::expr_fn::*;
168 pub use super::function::url::expr_fn::*;
169 pub use super::function::window::expr_fn::*;
170 pub use super::function::xml::expr_fn::*;
171}
172
173/// Returns all default scalar functions
174pub fn all_default_scalar_functions() -> Vec<Arc<ScalarUDF>> {
175 function::array::functions()
176 .into_iter()
177 .chain(function::bitmap::functions())
178 .chain(function::bitwise::functions())
179 .chain(function::collection::functions())
180 .chain(function::conditional::functions())
181 .chain(function::conversion::functions())
182 .chain(function::csv::functions())
183 .chain(function::datetime::functions())
184 .chain(function::generator::functions())
185 .chain(function::hash::functions())
186 .chain(function::json::functions())
187 .chain(function::lambda::functions())
188 .chain(function::map::functions())
189 .chain(function::math::functions())
190 .chain(function::misc::functions())
191 .chain(function::predicate::functions())
192 .chain(function::string::functions())
193 .chain(function::r#struct::functions())
194 .chain(function::url::functions())
195 .chain(function::xml::functions())
196 .collect::<Vec<_>>()
197}
198
199/// Returns all default aggregate functions
200pub fn all_default_aggregate_functions() -> Vec<Arc<AggregateUDF>> {
201 function::aggregate::functions()
202}
203
204/// Returns all default window functions
205pub fn all_default_window_functions() -> Vec<Arc<WindowUDF>> {
206 function::window::functions()
207}
208
209/// Returns all default table functions
210pub fn all_default_table_functions() -> Vec<Arc<TableFunction>> {
211 function::table::functions()
212}
213
214/// Registers all enabled packages with a [`FunctionRegistry`], overriding any existing
215/// functions if there is a name clash.
216pub fn register_all(registry: &mut dyn FunctionRegistry) -> Result<()> {
217 let scalar_functions: Vec<Arc<ScalarUDF>> = all_default_scalar_functions();
218 scalar_functions.into_iter().try_for_each(|udf| {
219 let existing_udf = registry.register_udf(udf)?;
220 if let Some(existing_udf) = existing_udf {
221 debug!("Overwrite existing UDF: {}", existing_udf.name());
222 }
223 Ok(()) as Result<()>
224 })?;
225
226 let aggregate_functions: Vec<Arc<AggregateUDF>> = all_default_aggregate_functions();
227 aggregate_functions.into_iter().try_for_each(|udf| {
228 let existing_udaf = registry.register_udaf(udf)?;
229 if let Some(existing_udaf) = existing_udaf {
230 debug!("Overwrite existing UDAF: {}", existing_udaf.name());
231 }
232 Ok(()) as Result<()>
233 })?;
234
235 let window_functions: Vec<Arc<WindowUDF>> = all_default_window_functions();
236 window_functions.into_iter().try_for_each(|udf| {
237 let existing_udwf = registry.register_udwf(udf)?;
238 if let Some(existing_udwf) = existing_udwf {
239 debug!("Overwrite existing UDWF: {}", existing_udwf.name());
240 }
241 Ok(()) as Result<()>
242 })?;
243
244 Ok(())
245}