1use std::sync::Arc;
19
20use arrow::array::{
21 Array, ArrayRef, GenericStringBuilder, LargeStringArray, StringArray,
22 StringArrayType, StringViewArray,
23};
24use arrow::datatypes::DataType;
25use datafusion_common::cast::{
26 as_large_string_array, as_string_array, as_string_view_array,
27};
28use datafusion_common::{Result, exec_datafusion_err, exec_err};
29use datafusion_expr::{
30 ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature,
31 Volatility,
32};
33use datafusion_functions::utils::make_scalar_function;
34use url::{ParseError, Url};
35
36#[derive(Debug, PartialEq, Eq, Hash)]
37pub struct ParseUrl {
38 signature: Signature,
39}
40
41impl Default for ParseUrl {
42 fn default() -> Self {
43 Self::new()
44 }
45}
46
47impl ParseUrl {
48 pub fn new() -> Self {
49 Self {
50 signature: Signature::one_of(
51 vec![TypeSignature::String(2), TypeSignature::String(3)],
52 Volatility::Immutable,
53 ),
54 }
55 }
56 fn parse(value: &str, part: &str, key: Option<&str>) -> Result<Option<String>> {
83 let url: std::result::Result<Url, ParseError> = Url::parse(value);
84 if let Err(ParseError::RelativeUrlWithoutBase) = url {
85 return if !value.contains("://") {
86 let (without_fragment, fragment) = match value.split_once('#') {
89 Some((before, frag)) => (before, Some(frag)),
90 None => (value, None),
91 };
92 let (path, query) = match without_fragment.split_once('?') {
93 Some((p, q)) => (p, Some(q)),
94 None => (without_fragment, None),
95 };
96 Ok(match part {
97 "PATH" => Some(path.to_string()),
98 "QUERY" => match key {
99 None => query.map(String::from),
100 Some(key) => Self::query_value(query, key).map(String::from),
101 },
102 "REF" => fragment.map(String::from),
103 "FILE" => {
104 Some(without_fragment.to_string())
106 }
107 _ => None,
109 })
110 } else {
111 Err(exec_datafusion_err!(
112 "The url is invalid: {value}. Use `try_parse_url` to tolerate invalid URL and return NULL instead. SQLSTATE: 22P02"
113 ))
114 };
115 };
116 url.map_err(|e| exec_datafusion_err!("{e:?}"))
117 .map(|url| match part {
118 "HOST" => url.host_str().map(String::from),
119 "PATH" => {
120 let path = Self::path(value, &url);
121 Some(path.to_string())
122 }
123 "QUERY" => match key {
124 None => url.query().map(String::from),
125 Some(key) => Self::query_value(url.query(), key).map(String::from),
126 },
127 "REF" => url.fragment().map(String::from),
128 "PROTOCOL" => Some(url.scheme().to_string()),
129 "FILE" => {
130 let path = Self::path(value, &url);
131 match url.query() {
132 Some(query) => Some(format!("{path}?{query}")),
133 None => Some(path.to_string()),
134 }
135 }
136 "AUTHORITY" => Some(url.authority().to_string()),
137 "USERINFO" => {
138 let username = url.username();
139 if username.is_empty() {
140 return None;
141 }
142 match url.password() {
143 Some(password) => Some(format!("{username}:{password}")),
144 None => Some(username.to_string()),
145 }
146 }
147 _ => None,
148 })
149 }
150
151 fn path<'a>(value: &str, url: &'a Url) -> &'a str {
152 let path = url.path();
153 if path == "/" && Self::absolute_url_has_empty_path(value) {
154 ""
155 } else {
156 path
157 }
158 }
159
160 fn absolute_url_has_empty_path(value: &str) -> bool {
161 let Some(authority_start) = value.find("://").map(|index| index + 3) else {
162 return false;
163 };
164 let after_authority = &value[authority_start..];
165 match after_authority.find(['/', '?', '#']) {
166 None => true,
167 Some(index) => matches!(after_authority.as_bytes()[index], b'?' | b'#'),
168 }
169 }
170
171 fn query_value<'a>(query: Option<&'a str>, key: &str) -> Option<&'a str> {
172 query.and_then(|query| {
173 query
174 .split('&')
175 .filter_map(|pair| pair.split_once('='))
176 .find(|(query_key, _)| *query_key == key)
177 .map(|(_, value)| value)
178 })
179 }
180}
181
182impl ScalarUDFImpl for ParseUrl {
183 fn name(&self) -> &str {
184 "parse_url"
185 }
186
187 fn signature(&self) -> &Signature {
188 &self.signature
189 }
190
191 fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
192 Ok(arg_types[0].clone())
193 }
194
195 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
196 let ScalarFunctionArgs { args, .. } = args;
197 make_scalar_function(spark_parse_url, vec![])(&args)
198 }
199}
200
201fn spark_parse_url(args: &[ArrayRef]) -> Result<ArrayRef> {
217 spark_handled_parse_url(args, |x| x)
218}
219
220pub fn spark_handled_parse_url(
221 args: &[ArrayRef],
222 handler_err: impl Fn(Result<Option<String>>) -> Result<Option<String>>,
223) -> Result<ArrayRef> {
224 if args.len() < 2 || args.len() > 3 {
225 return exec_err!(
226 "{} expects 2 or 3 arguments, but got {}",
227 "`parse_url`",
228 args.len()
229 );
230 }
231 let url = &args[0];
233 let part = &args[1];
234
235 if args.len() == 3 {
236 let key = &args[2];
238
239 match (url.data_type(), part.data_type(), key.data_type()) {
240 (DataType::Utf8, DataType::Utf8, DataType::Utf8) => {
241 process_parse_url::<_, _, _, StringArray>(
242 as_string_array(url)?,
243 as_string_array(part)?,
244 as_string_array(key)?,
245 handler_err,
246 true,
247 )
248 }
249 (DataType::Utf8View, DataType::Utf8View, DataType::Utf8View) => {
250 process_parse_url::<_, _, _, StringViewArray>(
251 as_string_view_array(url)?,
252 as_string_view_array(part)?,
253 as_string_view_array(key)?,
254 handler_err,
255 true,
256 )
257 }
258 (DataType::LargeUtf8, DataType::LargeUtf8, DataType::LargeUtf8) => {
259 process_parse_url::<_, _, _, LargeStringArray>(
260 as_large_string_array(url)?,
261 as_large_string_array(part)?,
262 as_large_string_array(key)?,
263 handler_err,
264 true,
265 )
266 }
267 _ => exec_err!(
268 "`parse_url` expects STRING arguments, got ({}, {}, {})",
269 url.data_type(),
270 part.data_type(),
271 key.data_type()
272 ),
273 }
274 } else {
275 let mut builder: GenericStringBuilder<i32> = GenericStringBuilder::new();
278 for _ in 0..args[0].len() {
279 builder.append_null();
280 }
281 let key = builder.finish();
282
283 match (url.data_type(), part.data_type()) {
284 (DataType::Utf8, DataType::Utf8) => {
285 process_parse_url::<_, _, _, StringArray>(
286 as_string_array(url)?,
287 as_string_array(part)?,
288 &key,
289 handler_err,
290 false,
291 )
292 }
293 (DataType::Utf8View, DataType::Utf8View) => {
294 process_parse_url::<_, _, _, StringViewArray>(
295 as_string_view_array(url)?,
296 as_string_view_array(part)?,
297 &key,
298 handler_err,
299 false,
300 )
301 }
302 (DataType::LargeUtf8, DataType::LargeUtf8) => {
303 process_parse_url::<_, _, _, LargeStringArray>(
304 as_large_string_array(url)?,
305 as_large_string_array(part)?,
306 &key,
307 handler_err,
308 false,
309 )
310 }
311 _ => exec_err!(
312 "`parse_url` expects STRING arguments, got ({}, {})",
313 url.data_type(),
314 part.data_type()
315 ),
316 }
317 }
318}
319
320fn process_parse_url<'a, A, B, C, T>(
321 url_array: &'a A,
322 part_array: &'a B,
323 key_array: &'a C,
324 handle: impl Fn(Result<Option<String>>) -> Result<Option<String>>,
325 has_key_arg: bool,
326) -> Result<ArrayRef>
327where
328 &'a A: StringArrayType<'a>,
329 &'a B: StringArrayType<'a>,
330 &'a C: StringArrayType<'a>,
331 T: Array + FromIterator<Option<String>> + 'static,
332{
333 url_array
334 .iter()
335 .zip(part_array.iter())
336 .zip(key_array.iter())
337 .map(|((url, part), key)| {
338 if has_key_arg && key.is_none() {
340 return Ok(None);
341 }
342 if let (Some(url), Some(part)) = (url, part) {
343 handle(ParseUrl::parse(url, part, key))
344 } else {
345 Ok(None)
346 }
347 })
348 .collect::<Result<T>>()
349 .map(|array| Arc::new(array) as ArrayRef)
350}
351
352#[cfg(test)]
353mod tests {
354 use super::*;
355 use arrow::array::Int32Array;
356 use std::array::from_ref;
357
358 fn sa(vals: &[Option<&str>]) -> ArrayRef {
359 Arc::new(StringArray::from(vals.to_vec())) as ArrayRef
360 }
361
362 #[test]
363 fn test_parse_host() -> Result<()> {
364 let got = ParseUrl::parse("https://example.com/a?x=1", "HOST", None)?;
365 assert_eq!(got, Some("example.com".to_string()));
366 Ok(())
367 }
368
369 #[test]
370 fn test_parse_query_no_key_vs_with_key() -> Result<()> {
371 let got_all = ParseUrl::parse("https://ex.com/p?a=1&b=2", "QUERY", None)?;
372 assert_eq!(got_all, Some("a=1&b=2".to_string()));
373
374 let got_a = ParseUrl::parse("https://ex.com/p?a=1&b=2", "QUERY", Some("a"))?;
375 assert_eq!(got_a, Some("1".to_string()));
376
377 let got_c = ParseUrl::parse("https://ex.com/p?a=1&b=2", "QUERY", Some("c"))?;
378 assert_eq!(got_c, None);
379 Ok(())
380 }
381
382 #[test]
383 fn test_parse_ref_protocol_userinfo_file_authority() -> Result<()> {
384 let url = "ftp://user:pwd@ftp.example.com:21/files?x=1#frag";
385 assert_eq!(ParseUrl::parse(url, "REF", None)?, Some("frag".to_string()));
386 assert_eq!(
387 ParseUrl::parse(url, "PROTOCOL", None)?,
388 Some("ftp".to_string())
389 );
390 assert_eq!(
391 ParseUrl::parse(url, "USERINFO", None)?,
392 Some("user:pwd".to_string())
393 );
394 assert_eq!(
395 ParseUrl::parse(url, "FILE", None)?,
396 Some("/files?x=1".to_string())
397 );
398 assert_eq!(
399 ParseUrl::parse(url, "AUTHORITY", None)?,
400 Some("user:pwd@ftp.example.com".to_string())
401 );
402 Ok(())
403 }
404
405 #[test]
406 fn test_parse_path_empty_vs_root() -> Result<()> {
407 assert_eq!(
408 ParseUrl::parse("https://example.com", "PATH", None)?,
409 Some("".to_string())
410 );
411 assert_eq!(
412 ParseUrl::parse("https://example.com/", "PATH", None)?,
413 Some("/".to_string())
414 );
415 assert_eq!(
416 ParseUrl::parse("https://ex.com/dir%20/pa%20th.HTML", "PATH", None)?,
417 Some("/dir%20/pa%20th.HTML".to_string())
418 );
419 Ok(())
420 }
421
422 #[test]
423 fn test_parse_query_key_is_raw() -> Result<()> {
424 let url = "https://use%20r:pas%20s@example.com/dir%20/pa%20th.HTML?query=x%20y&q2=2#Ref%20two";
425 assert_eq!(
426 ParseUrl::parse(url, "QUERY", None)?,
427 Some("query=x%20y&q2=2".to_string())
428 );
429 assert_eq!(
430 ParseUrl::parse(url, "QUERY", Some("query"))?,
431 Some("x%20y".to_string())
432 );
433 assert_eq!(
434 ParseUrl::parse("http://ex.com?key=", "QUERY", Some("key"))?,
435 Some("".to_string())
436 );
437 assert_eq!(
438 ParseUrl::parse("http://ex.com?keyonly", "QUERY", Some("keyonly"))?,
439 None
440 );
441 assert_eq!(
442 ParseUrl::parse("http://ex.com?a=1&a=2", "QUERY", Some("a"))?,
443 Some("1".to_string())
444 );
445 assert_eq!(
446 ParseUrl::parse("http://ex.com?a%20b=1", "QUERY", Some("a b"))?,
447 None
448 );
449 Ok(())
450 }
451
452 #[test]
453 fn test_parse_empty_path_file() -> Result<()> {
454 assert_eq!(ParseUrl::parse("", "PATH", None)?, Some("".to_string()));
455 assert_eq!(
456 ParseUrl::parse("http://example.com", "FILE", None)?,
457 Some("".to_string())
458 );
459 assert_eq!(
460 ParseUrl::parse("http://example.com?foo=bar", "FILE", None)?,
461 Some("?foo=bar".to_string())
462 );
463 assert_eq!(
464 ParseUrl::parse("http://example.com#fragment", "FILE", None)?,
465 Some("".to_string())
466 );
467 assert_eq!(
468 ParseUrl::parse("http://example.com/?foo=bar", "FILE", None)?,
469 Some("/?foo=bar".to_string())
470 );
471 assert_eq!(
472 ParseUrl::parse("http://ex.com/?", "FILE", None)?,
473 Some("/?".to_string())
474 );
475 assert_eq!(
476 ParseUrl::parse("http://ex.com?", "FILE", None)?,
477 Some("?".to_string())
478 );
479 Ok(())
480 }
481
482 #[test]
483 fn test_parse_schemeless_url() -> Result<()> {
484 assert_eq!(
487 ParseUrl::parse("notaurl", "PATH", None)?,
488 Some("notaurl".to_string())
489 );
490 assert_eq!(
491 ParseUrl::parse("notaurl", "FILE", None)?,
492 Some("notaurl".to_string())
493 );
494 assert_eq!(ParseUrl::parse("notaurl", "HOST", None)?, None);
495 assert_eq!(ParseUrl::parse("notaurl", "PROTOCOL", None)?, None);
496 assert_eq!(ParseUrl::parse("notaurl", "QUERY", None)?, None);
497 assert_eq!(ParseUrl::parse("notaurl", "REF", None)?, None);
498 assert_eq!(ParseUrl::parse("notaurl", "AUTHORITY", None)?, None);
499 assert_eq!(ParseUrl::parse("notaurl", "USERINFO", None)?, None);
500
501 assert_eq!(
503 ParseUrl::parse("notaurl?key=value", "PATH", None)?,
504 Some("notaurl".to_string())
505 );
506 assert_eq!(
507 ParseUrl::parse("notaurl?key=value", "FILE", None)?,
508 Some("notaurl?key=value".to_string())
509 );
510 assert_eq!(
511 ParseUrl::parse("notaurl?key=value", "QUERY", None)?,
512 Some("key=value".to_string())
513 );
514 assert_eq!(
515 ParseUrl::parse("notaurl?key=value", "QUERY", Some("key"))?,
516 Some("value".to_string())
517 );
518 assert_eq!(
519 ParseUrl::parse("notaurl?key=value", "QUERY", Some("missing"))?,
520 None
521 );
522 assert_eq!(ParseUrl::parse("notaurl?key=value", "HOST", None)?, None);
523 assert_eq!(
524 ParseUrl::parse("notaurl?key=value", "PROTOCOL", None)?,
525 None
526 );
527
528 assert_eq!(
530 ParseUrl::parse("notaurl#reference", "REF", None)?,
531 Some("reference".to_string())
532 );
533 assert_eq!(
534 ParseUrl::parse("notaurl#reference", "PATH", None)?,
535 Some("notaurl".to_string())
536 );
537 assert_eq!(
538 ParseUrl::parse("notaurl#reference", "FILE", None)?,
539 Some("notaurl".to_string())
540 );
541
542 assert_eq!(
544 ParseUrl::parse("notaurl?a=1&b=2#frag", "PATH", None)?,
545 Some("notaurl".to_string())
546 );
547 assert_eq!(
548 ParseUrl::parse("notaurl?a=1&b=2#frag", "QUERY", None)?,
549 Some("a=1&b=2".to_string())
550 );
551 assert_eq!(
552 ParseUrl::parse("notaurl?a=1&b=2#frag", "QUERY", Some("b"))?,
553 Some("2".to_string())
554 );
555 assert_eq!(
556 ParseUrl::parse("notaurl?a=1&b=2#frag", "REF", None)?,
557 Some("frag".to_string())
558 );
559 assert_eq!(
560 ParseUrl::parse("notaurl?a=1&b=2#frag", "FILE", None)?,
561 Some("notaurl?a=1&b=2".to_string())
562 );
563 Ok(())
564 }
565
566 #[test]
567 fn test_spark_utf8_two_args() -> Result<()> {
568 let urls = sa(&[Some("https://example.com/a?x=1"), Some("https://ex.com/")]);
569 let parts = sa(&[Some("HOST"), Some("PATH")]);
570
571 let out = spark_handled_parse_url(&[urls, parts], |x| x)?;
572 let out_sa = out.as_any().downcast_ref::<StringArray>().unwrap();
573
574 assert_eq!(out_sa.len(), 2);
575 assert_eq!(out_sa.value(0), "example.com");
576 assert_eq!(out_sa.value(1), "/");
577 Ok(())
578 }
579
580 #[test]
581 fn test_spark_utf8_three_args_query_key() -> Result<()> {
582 let urls = sa(&[
583 Some("https://example.com/a?x=1&y=2"),
584 Some("https://ex.com/?a=1"),
585 ]);
586 let parts = sa(&[Some("QUERY"), Some("QUERY")]);
587 let keys = sa(&[Some("y"), Some("b")]);
588
589 let out = spark_handled_parse_url(&[urls, parts, keys], |x| x)?;
590 let out_sa = out.as_any().downcast_ref::<StringArray>().unwrap();
591
592 assert_eq!(out_sa.len(), 2);
593 assert_eq!(out_sa.value(0), "2");
594 assert!(out_sa.is_null(1));
595 Ok(())
596 }
597
598 #[test]
599 fn test_spark_userinfo_and_nulls() -> Result<()> {
600 let urls = sa(&[
601 Some("ftp://user:pwd@ftp.example.com:21/files"),
602 Some("https://example.com"),
603 None,
604 ]);
605 let parts = sa(&[Some("USERINFO"), Some("USERINFO"), Some("USERINFO")]);
606
607 let out = spark_handled_parse_url(&[urls, parts], |x| x)?;
608 let out_sa = out.as_any().downcast_ref::<StringArray>().unwrap();
609
610 assert_eq!(out_sa.len(), 3);
611 assert_eq!(out_sa.value(0), "user:pwd");
612 assert!(out_sa.is_null(1));
613 assert!(out_sa.is_null(2));
614 Ok(())
615 }
616
617 #[test]
618 fn test_invalid_arg_count() {
619 let urls = sa(&[Some("https://example.com")]);
620 let err = spark_handled_parse_url(from_ref(&urls), |x| x).unwrap_err();
621 assert!(format!("{err}").contains("expects 2 or 3 arguments"));
622
623 let parts = sa(&[Some("HOST")]);
624 let keys = sa(&[Some("x")]);
625 let err =
626 spark_handled_parse_url(&[urls, parts, keys, sa(&[Some("extra")])], |x| x)
627 .unwrap_err();
628 assert!(format!("{err}").contains("expects 2 or 3 arguments"));
629 }
630
631 #[test]
632 fn test_non_string_types_error() {
633 let urls = sa(&[Some("https://example.com")]);
634 let bad_part = Arc::new(Int32Array::from(vec![1])) as ArrayRef;
635
636 let err = spark_handled_parse_url(&[urls, bad_part], |x| x).unwrap_err();
637 let msg = format!("{err}");
638 assert!(msg.contains("expects STRING arguments"));
639 }
640}