datafusion_spark/function/url/
parse_url.rs1use std::any::Any;
19use std::sync::Arc;
20
21use arrow::array::{
22 Array, ArrayRef, GenericStringBuilder, LargeStringArray, StringArray, StringArrayType,
23};
24use arrow::datatypes::DataType;
25use datafusion_common::cast::{
26 as_large_string_array, as_string_array, as_string_view_array,
27};
28use datafusion_common::{exec_datafusion_err, exec_err, plan_err, Result};
29use datafusion_expr::{
30 ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature,
31 Volatility,
32};
33use datafusion_functions::utils::make_scalar_function;
34use url::Url;
35
36#[derive(Debug, PartialEq, Eq, Hash)]
37pub struct ParseUrl {
38 signature: Signature,
39}
40
41impl Default for ParseUrl {
42 fn default() -> Self {
43 Self::new()
44 }
45}
46
47impl ParseUrl {
48 pub fn new() -> Self {
49 Self {
50 signature: Signature::one_of(
51 vec![
52 TypeSignature::Uniform(
53 1,
54 vec![DataType::Utf8View, DataType::Utf8, DataType::LargeUtf8],
55 ),
56 TypeSignature::Uniform(
57 2,
58 vec![DataType::Utf8View, DataType::Utf8, DataType::LargeUtf8],
59 ),
60 TypeSignature::Uniform(
61 3,
62 vec![DataType::Utf8View, DataType::Utf8, DataType::LargeUtf8],
63 ),
64 ],
65 Volatility::Immutable,
66 ),
67 }
68 }
69 fn parse(value: &str, part: &str, key: Option<&str>) -> Result<Option<String>> {
97 Url::parse(value)
98 .map_err(|e| exec_datafusion_err!("{e:?}"))
99 .map(|url| match part {
100 "HOST" => url.host_str().map(String::from),
101 "PATH" => Some(url.path().to_string()),
102 "QUERY" => match key {
103 None => url.query().map(String::from),
104 Some(key) => url
105 .query_pairs()
106 .find(|(k, _)| k == key)
107 .map(|(_, v)| v.into_owned()),
108 },
109 "REF" => url.fragment().map(String::from),
110 "PROTOCOL" => Some(url.scheme().to_string()),
111 "FILE" => {
112 let path = url.path();
113 match url.query() {
114 Some(query) => Some(format!("{path}?{query}")),
115 None => Some(path.to_string()),
116 }
117 }
118 "AUTHORITY" => Some(url.authority().to_string()),
119 "USERINFO" => {
120 let username = url.username();
121 if username.is_empty() {
122 return None;
123 }
124 match url.password() {
125 Some(password) => Some(format!("{username}:{password}")),
126 None => Some(username.to_string()),
127 }
128 }
129 _ => None,
130 })
131 }
132}
133
134impl ScalarUDFImpl for ParseUrl {
135 fn as_any(&self) -> &dyn Any {
136 self
137 }
138
139 fn name(&self) -> &str {
140 "parse_url"
141 }
142
143 fn signature(&self) -> &Signature {
144 &self.signature
145 }
146
147 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
148 if arg_types.len() < 2 || arg_types.len() > 3 {
149 return plan_err!(
150 "{} expects 2 or 3 arguments, but got {}",
151 self.name(),
152 arg_types.len()
153 );
154 }
155 match arg_types.len() {
156 2 | 3 => {
157 if arg_types
158 .iter()
159 .any(|arg| matches!(arg, DataType::LargeUtf8))
160 {
161 Ok(DataType::LargeUtf8)
162 } else if arg_types
163 .iter()
164 .any(|arg| matches!(arg, DataType::Utf8View))
165 {
166 Ok(DataType::Utf8View)
167 } else {
168 Ok(DataType::Utf8)
169 }
170 }
171 _ => plan_err!(
172 "`{}` expects 2 or 3 arguments, got {}",
173 &self.name(),
174 arg_types.len()
175 ),
176 }
177 }
178
179 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
180 let ScalarFunctionArgs { args, .. } = args;
181 make_scalar_function(spark_parse_url, vec![])(&args)
182 }
183}
184
185fn spark_parse_url(args: &[ArrayRef]) -> Result<ArrayRef> {
202 if args.len() < 2 || args.len() > 3 {
203 return exec_err!(
204 "{} expects 2 or 3 arguments, but got {}",
205 "`parse_url`",
206 args.len()
207 );
208 }
209 let url = &args[0];
211 let part = &args[1];
212
213 let result = if args.len() == 3 {
214 let key = &args[2];
215
216 match (url.data_type(), part.data_type(), key.data_type()) {
217 (DataType::Utf8, DataType::Utf8, DataType::Utf8) => {
218 process_parse_url::<_, _, _, StringArray>(
219 as_string_array(url)?,
220 as_string_array(part)?,
221 as_string_array(key)?,
222 )
223 }
224 (DataType::Utf8View, DataType::Utf8View, DataType::Utf8View) => {
225 process_parse_url::<_, _, _, StringArray>(
226 as_string_view_array(url)?,
227 as_string_view_array(part)?,
228 as_string_view_array(key)?,
229 )
230 }
231 (DataType::LargeUtf8, DataType::LargeUtf8, DataType::LargeUtf8) => {
232 process_parse_url::<_, _, _, LargeStringArray>(
233 as_large_string_array(url)?,
234 as_large_string_array(part)?,
235 as_large_string_array(key)?,
236 )
237 }
238 _ => exec_err!("{} expects STRING arguments, got {:?}", "`parse_url`", args),
239 }
240 } else {
241 let mut builder: GenericStringBuilder<i32> = GenericStringBuilder::new();
244 for _ in 0..args[0].len() {
245 builder.append_null();
246 }
247 let key = builder.finish();
248
249 match (url.data_type(), part.data_type()) {
250 (DataType::Utf8, DataType::Utf8) => {
251 process_parse_url::<_, _, _, StringArray>(
252 as_string_array(url)?,
253 as_string_array(part)?,
254 &key,
255 )
256 }
257 (DataType::Utf8View, DataType::Utf8View) => {
258 process_parse_url::<_, _, _, StringArray>(
259 as_string_view_array(url)?,
260 as_string_view_array(part)?,
261 &key,
262 )
263 }
264 (DataType::LargeUtf8, DataType::LargeUtf8) => {
265 process_parse_url::<_, _, _, LargeStringArray>(
266 as_large_string_array(url)?,
267 as_large_string_array(part)?,
268 &key,
269 )
270 }
271 _ => exec_err!("{} expects STRING arguments, got {:?}", "`parse_url`", args),
272 }
273 };
274 result
275}
276
277fn process_parse_url<'a, A, B, C, T>(
278 url_array: &'a A,
279 part_array: &'a B,
280 key_array: &'a C,
281) -> Result<ArrayRef>
282where
283 &'a A: StringArrayType<'a>,
284 &'a B: StringArrayType<'a>,
285 &'a C: StringArrayType<'a>,
286 T: Array + FromIterator<Option<String>> + 'static,
287{
288 url_array
289 .iter()
290 .zip(part_array.iter())
291 .zip(key_array.iter())
292 .map(|((url, part), key)| {
293 if let (Some(url), Some(part), key) = (url, part, key) {
294 ParseUrl::parse(url, part, key)
295 } else {
296 Ok(None)
297 }
298 })
299 .collect::<Result<T>>()
300 .map(|array| Arc::new(array) as ArrayRef)
301}