Skip to main content

datafusion_spark/function/url/
parse_url.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use arrow::array::{
21    Array, ArrayRef, GenericStringBuilder, LargeStringArray, StringArray,
22    StringArrayType, StringViewArray,
23};
24use arrow::datatypes::DataType;
25use datafusion_common::cast::{
26    as_large_string_array, as_string_array, as_string_view_array,
27};
28use datafusion_common::{Result, exec_datafusion_err, exec_err};
29use datafusion_expr::{
30    ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature,
31    Volatility,
32};
33use datafusion_functions::utils::make_scalar_function;
34use url::{ParseError, Url};
35
36#[derive(Debug, PartialEq, Eq, Hash)]
37pub struct ParseUrl {
38    signature: Signature,
39}
40
41impl Default for ParseUrl {
42    fn default() -> Self {
43        Self::new()
44    }
45}
46
47impl ParseUrl {
48    pub fn new() -> Self {
49        Self {
50            signature: Signature::one_of(
51                vec![TypeSignature::String(2), TypeSignature::String(3)],
52                Volatility::Immutable,
53            ),
54        }
55    }
56    /// Parses a URL and extracts the specified component.
57    ///
58    /// This function takes a URL string and extracts different parts of it based on the
59    /// `part` parameter. For query parameters, an optional `key` can be specified to
60    /// extract a specific query parameter value.
61    ///
62    /// # Arguments
63    ///
64    /// * `value` - The URL string to parse
65    /// * `part` - The component of the URL to extract. Valid values are:
66    ///   - `"HOST"` - The hostname (e.g., "example.com")
67    ///   - `"PATH"` - The path portion (e.g., "/path/to/resource")
68    ///   - `"QUERY"` - The query string or a specific query parameter
69    ///   - `"REF"` - The fragment/anchor (the part after #)
70    ///   - `"PROTOCOL"` - The URL scheme (e.g., "https", "http")
71    ///   - `"FILE"` - The path with query string (e.g., "/path?query=value")
72    ///   - `"AUTHORITY"` - The authority component (host:port)
73    ///   - `"USERINFO"` - The user information (username:password)
74    /// * `key` - Optional parameter used only with `"QUERY"`. When provided, extracts
75    ///   the value of the specific query parameter with this key name.
76    ///
77    /// # Returns
78    ///
79    /// * `Ok(Some(String))` - The extracted URL component as a string
80    /// * `Ok(None)` - If the requested component doesn't exist
81    /// * `Err(DataFusionError)` - If the URL is malformed and cannot be parsed
82    fn parse(value: &str, part: &str, key: Option<&str>) -> Result<Option<String>> {
83        let url: std::result::Result<Url, ParseError> = Url::parse(value);
84        if let Err(ParseError::RelativeUrlWithoutBase) = url {
85            return if !value.contains("://") {
86                // Schemeless URLs are treated as relative URIs (like java.net.URI).
87                // Manually parse path, query, and fragment components.
88                let (without_fragment, fragment) = match value.split_once('#') {
89                    Some((before, frag)) => (before, Some(frag)),
90                    None => (value, None),
91                };
92                let (path, query) = match without_fragment.split_once('?') {
93                    Some((p, q)) => (p, Some(q)),
94                    None => (without_fragment, None),
95                };
96                Ok(match part {
97                    "PATH" => Some(path.to_string()),
98                    "QUERY" => match key {
99                        None => query.map(String::from),
100                        Some(key) => Self::query_value(query, key).map(String::from),
101                    },
102                    "REF" => fragment.map(String::from),
103                    "FILE" => {
104                        // FILE = path + query (without fragment)
105                        Some(without_fragment.to_string())
106                    }
107                    // HOST, PROTOCOL, AUTHORITY, USERINFO → NULL
108                    _ => None,
109                })
110            } else {
111                Err(exec_datafusion_err!(
112                    "The url is invalid: {value}. Use `try_parse_url` to tolerate invalid URL and return NULL instead. SQLSTATE: 22P02"
113                ))
114            };
115        };
116        url.map_err(|e| exec_datafusion_err!("{e:?}"))
117            .map(|url| match part {
118                "HOST" => url.host_str().map(String::from),
119                "PATH" => {
120                    let path = Self::path(value, &url);
121                    Some(path.to_string())
122                }
123                "QUERY" => match key {
124                    None => url.query().map(String::from),
125                    Some(key) => Self::query_value(url.query(), key).map(String::from),
126                },
127                "REF" => url.fragment().map(String::from),
128                "PROTOCOL" => Some(url.scheme().to_string()),
129                "FILE" => {
130                    let path = Self::path(value, &url);
131                    match url.query() {
132                        Some(query) => Some(format!("{path}?{query}")),
133                        None => Some(path.to_string()),
134                    }
135                }
136                "AUTHORITY" => Some(url.authority().to_string()),
137                "USERINFO" => {
138                    let username = url.username();
139                    if username.is_empty() {
140                        return None;
141                    }
142                    match url.password() {
143                        Some(password) => Some(format!("{username}:{password}")),
144                        None => Some(username.to_string()),
145                    }
146                }
147                _ => None,
148            })
149    }
150
151    fn path<'a>(value: &str, url: &'a Url) -> &'a str {
152        let path = url.path();
153        if path == "/" && Self::absolute_url_has_empty_path(value) {
154            ""
155        } else {
156            path
157        }
158    }
159
160    fn absolute_url_has_empty_path(value: &str) -> bool {
161        let Some(authority_start) = value.find("://").map(|index| index + 3) else {
162            return false;
163        };
164        let after_authority = &value[authority_start..];
165        match after_authority.find(['/', '?', '#']) {
166            None => true,
167            Some(index) => matches!(after_authority.as_bytes()[index], b'?' | b'#'),
168        }
169    }
170
171    fn query_value<'a>(query: Option<&'a str>, key: &str) -> Option<&'a str> {
172        query.and_then(|query| {
173            query
174                .split('&')
175                .filter_map(|pair| pair.split_once('='))
176                .find(|(query_key, _)| *query_key == key)
177                .map(|(_, value)| value)
178        })
179    }
180}
181
182impl ScalarUDFImpl for ParseUrl {
183    fn name(&self) -> &str {
184        "parse_url"
185    }
186
187    fn signature(&self) -> &Signature {
188        &self.signature
189    }
190
191    fn return_type(&self, arg_types: &[DataType]) -> Result<DataType> {
192        Ok(arg_types[0].clone())
193    }
194
195    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
196        let ScalarFunctionArgs { args, .. } = args;
197        make_scalar_function(spark_parse_url, vec![])(&args)
198    }
199}
200
201/// Core implementation of URL parsing function.
202///
203/// # Arguments
204///
205/// * `args` - A slice of ArrayRef containing the input arrays:
206///   - `args[0]` - URL array: The URLs to parse
207///   - `args[1]` - Part array: The URL components to extract (HOST, PATH, QUERY, etc.)
208///   - `args[2]` - Key array (optional): For QUERY part, the specific parameter names to extract
209///
210/// # Return Value
211///
212/// Returns `Result<ArrayRef>` containing:
213/// - A string array with extracted URL components
214/// - `None` values where extraction failed or component doesn't exist
215/// - The output array type (StringArray or LargeStringArray) is determined by input types
216fn spark_parse_url(args: &[ArrayRef]) -> Result<ArrayRef> {
217    spark_handled_parse_url(args, |x| x)
218}
219
220pub fn spark_handled_parse_url(
221    args: &[ArrayRef],
222    handler_err: impl Fn(Result<Option<String>>) -> Result<Option<String>>,
223) -> Result<ArrayRef> {
224    if args.len() < 2 || args.len() > 3 {
225        return exec_err!(
226            "{} expects 2 or 3 arguments, but got {}",
227            "`parse_url`",
228            args.len()
229        );
230    }
231    // Required arguments
232    let url = &args[0];
233    let part = &args[1];
234
235    if args.len() == 3 {
236        // In this case, the 'key' argument is passed
237        let key = &args[2];
238
239        match (url.data_type(), part.data_type(), key.data_type()) {
240            (DataType::Utf8, DataType::Utf8, DataType::Utf8) => {
241                process_parse_url::<_, _, _, StringArray>(
242                    as_string_array(url)?,
243                    as_string_array(part)?,
244                    as_string_array(key)?,
245                    handler_err,
246                    true,
247                )
248            }
249            (DataType::Utf8View, DataType::Utf8View, DataType::Utf8View) => {
250                process_parse_url::<_, _, _, StringViewArray>(
251                    as_string_view_array(url)?,
252                    as_string_view_array(part)?,
253                    as_string_view_array(key)?,
254                    handler_err,
255                    true,
256                )
257            }
258            (DataType::LargeUtf8, DataType::LargeUtf8, DataType::LargeUtf8) => {
259                process_parse_url::<_, _, _, LargeStringArray>(
260                    as_large_string_array(url)?,
261                    as_large_string_array(part)?,
262                    as_large_string_array(key)?,
263                    handler_err,
264                    true,
265                )
266            }
267            _ => exec_err!(
268                "`parse_url` expects STRING arguments, got ({}, {}, {})",
269                url.data_type(),
270                part.data_type(),
271                key.data_type()
272            ),
273        }
274    } else {
275        // The 'key' argument is omitted, assume all values are null
276        // Create 'null' string array for 'key' argument
277        let mut builder: GenericStringBuilder<i32> = GenericStringBuilder::new();
278        for _ in 0..args[0].len() {
279            builder.append_null();
280        }
281        let key = builder.finish();
282
283        match (url.data_type(), part.data_type()) {
284            (DataType::Utf8, DataType::Utf8) => {
285                process_parse_url::<_, _, _, StringArray>(
286                    as_string_array(url)?,
287                    as_string_array(part)?,
288                    &key,
289                    handler_err,
290                    false,
291                )
292            }
293            (DataType::Utf8View, DataType::Utf8View) => {
294                process_parse_url::<_, _, _, StringViewArray>(
295                    as_string_view_array(url)?,
296                    as_string_view_array(part)?,
297                    &key,
298                    handler_err,
299                    false,
300                )
301            }
302            (DataType::LargeUtf8, DataType::LargeUtf8) => {
303                process_parse_url::<_, _, _, LargeStringArray>(
304                    as_large_string_array(url)?,
305                    as_large_string_array(part)?,
306                    &key,
307                    handler_err,
308                    false,
309                )
310            }
311            _ => exec_err!(
312                "`parse_url` expects STRING arguments, got ({}, {})",
313                url.data_type(),
314                part.data_type()
315            ),
316        }
317    }
318}
319
320fn process_parse_url<'a, A, B, C, T>(
321    url_array: &'a A,
322    part_array: &'a B,
323    key_array: &'a C,
324    handle: impl Fn(Result<Option<String>>) -> Result<Option<String>>,
325    has_key_arg: bool,
326) -> Result<ArrayRef>
327where
328    &'a A: StringArrayType<'a>,
329    &'a B: StringArrayType<'a>,
330    &'a C: StringArrayType<'a>,
331    T: Array + FromIterator<Option<String>> + 'static,
332{
333    url_array
334        .iter()
335        .zip(part_array.iter())
336        .zip(key_array.iter())
337        .map(|((url, part), key)| {
338            // Spark returns NULL when the third argument is explicitly NULL
339            if has_key_arg && key.is_none() {
340                return Ok(None);
341            }
342            if let (Some(url), Some(part)) = (url, part) {
343                handle(ParseUrl::parse(url, part, key))
344            } else {
345                Ok(None)
346            }
347        })
348        .collect::<Result<T>>()
349        .map(|array| Arc::new(array) as ArrayRef)
350}
351
352#[cfg(test)]
353mod tests {
354    use super::*;
355    use arrow::array::Int32Array;
356    use std::array::from_ref;
357
358    fn sa(vals: &[Option<&str>]) -> ArrayRef {
359        Arc::new(StringArray::from(vals.to_vec())) as ArrayRef
360    }
361
362    #[test]
363    fn test_parse_host() -> Result<()> {
364        let got = ParseUrl::parse("https://example.com/a?x=1", "HOST", None)?;
365        assert_eq!(got, Some("example.com".to_string()));
366        Ok(())
367    }
368
369    #[test]
370    fn test_parse_query_no_key_vs_with_key() -> Result<()> {
371        let got_all = ParseUrl::parse("https://ex.com/p?a=1&b=2", "QUERY", None)?;
372        assert_eq!(got_all, Some("a=1&b=2".to_string()));
373
374        let got_a = ParseUrl::parse("https://ex.com/p?a=1&b=2", "QUERY", Some("a"))?;
375        assert_eq!(got_a, Some("1".to_string()));
376
377        let got_c = ParseUrl::parse("https://ex.com/p?a=1&b=2", "QUERY", Some("c"))?;
378        assert_eq!(got_c, None);
379        Ok(())
380    }
381
382    #[test]
383    fn test_parse_ref_protocol_userinfo_file_authority() -> Result<()> {
384        let url = "ftp://user:pwd@ftp.example.com:21/files?x=1#frag";
385        assert_eq!(ParseUrl::parse(url, "REF", None)?, Some("frag".to_string()));
386        assert_eq!(
387            ParseUrl::parse(url, "PROTOCOL", None)?,
388            Some("ftp".to_string())
389        );
390        assert_eq!(
391            ParseUrl::parse(url, "USERINFO", None)?,
392            Some("user:pwd".to_string())
393        );
394        assert_eq!(
395            ParseUrl::parse(url, "FILE", None)?,
396            Some("/files?x=1".to_string())
397        );
398        assert_eq!(
399            ParseUrl::parse(url, "AUTHORITY", None)?,
400            Some("user:pwd@ftp.example.com".to_string())
401        );
402        Ok(())
403    }
404
405    #[test]
406    fn test_parse_path_empty_vs_root() -> Result<()> {
407        assert_eq!(
408            ParseUrl::parse("https://example.com", "PATH", None)?,
409            Some("".to_string())
410        );
411        assert_eq!(
412            ParseUrl::parse("https://example.com/", "PATH", None)?,
413            Some("/".to_string())
414        );
415        assert_eq!(
416            ParseUrl::parse("https://ex.com/dir%20/pa%20th.HTML", "PATH", None)?,
417            Some("/dir%20/pa%20th.HTML".to_string())
418        );
419        Ok(())
420    }
421
422    #[test]
423    fn test_parse_query_key_is_raw() -> Result<()> {
424        let url = "https://use%20r:pas%20s@example.com/dir%20/pa%20th.HTML?query=x%20y&q2=2#Ref%20two";
425        assert_eq!(
426            ParseUrl::parse(url, "QUERY", None)?,
427            Some("query=x%20y&q2=2".to_string())
428        );
429        assert_eq!(
430            ParseUrl::parse(url, "QUERY", Some("query"))?,
431            Some("x%20y".to_string())
432        );
433        assert_eq!(
434            ParseUrl::parse("http://ex.com?key=", "QUERY", Some("key"))?,
435            Some("".to_string())
436        );
437        assert_eq!(
438            ParseUrl::parse("http://ex.com?keyonly", "QUERY", Some("keyonly"))?,
439            None
440        );
441        assert_eq!(
442            ParseUrl::parse("http://ex.com?a=1&a=2", "QUERY", Some("a"))?,
443            Some("1".to_string())
444        );
445        assert_eq!(
446            ParseUrl::parse("http://ex.com?a%20b=1", "QUERY", Some("a b"))?,
447            None
448        );
449        Ok(())
450    }
451
452    #[test]
453    fn test_parse_empty_path_file() -> Result<()> {
454        assert_eq!(ParseUrl::parse("", "PATH", None)?, Some("".to_string()));
455        assert_eq!(
456            ParseUrl::parse("http://example.com", "FILE", None)?,
457            Some("".to_string())
458        );
459        assert_eq!(
460            ParseUrl::parse("http://example.com?foo=bar", "FILE", None)?,
461            Some("?foo=bar".to_string())
462        );
463        assert_eq!(
464            ParseUrl::parse("http://example.com#fragment", "FILE", None)?,
465            Some("".to_string())
466        );
467        assert_eq!(
468            ParseUrl::parse("http://example.com/?foo=bar", "FILE", None)?,
469            Some("/?foo=bar".to_string())
470        );
471        assert_eq!(
472            ParseUrl::parse("http://ex.com/?", "FILE", None)?,
473            Some("/?".to_string())
474        );
475        assert_eq!(
476            ParseUrl::parse("http://ex.com?", "FILE", None)?,
477            Some("?".to_string())
478        );
479        Ok(())
480    }
481
482    #[test]
483    fn test_parse_schemeless_url() -> Result<()> {
484        // Spark's java.net.URI treats schemeless strings as relative URIs.
485        // Simple schemeless string: no query, no fragment.
486        assert_eq!(
487            ParseUrl::parse("notaurl", "PATH", None)?,
488            Some("notaurl".to_string())
489        );
490        assert_eq!(
491            ParseUrl::parse("notaurl", "FILE", None)?,
492            Some("notaurl".to_string())
493        );
494        assert_eq!(ParseUrl::parse("notaurl", "HOST", None)?, None);
495        assert_eq!(ParseUrl::parse("notaurl", "PROTOCOL", None)?, None);
496        assert_eq!(ParseUrl::parse("notaurl", "QUERY", None)?, None);
497        assert_eq!(ParseUrl::parse("notaurl", "REF", None)?, None);
498        assert_eq!(ParseUrl::parse("notaurl", "AUTHORITY", None)?, None);
499        assert_eq!(ParseUrl::parse("notaurl", "USERINFO", None)?, None);
500
501        // Schemeless URL with query string
502        assert_eq!(
503            ParseUrl::parse("notaurl?key=value", "PATH", None)?,
504            Some("notaurl".to_string())
505        );
506        assert_eq!(
507            ParseUrl::parse("notaurl?key=value", "FILE", None)?,
508            Some("notaurl?key=value".to_string())
509        );
510        assert_eq!(
511            ParseUrl::parse("notaurl?key=value", "QUERY", None)?,
512            Some("key=value".to_string())
513        );
514        assert_eq!(
515            ParseUrl::parse("notaurl?key=value", "QUERY", Some("key"))?,
516            Some("value".to_string())
517        );
518        assert_eq!(
519            ParseUrl::parse("notaurl?key=value", "QUERY", Some("missing"))?,
520            None
521        );
522        assert_eq!(ParseUrl::parse("notaurl?key=value", "HOST", None)?, None);
523        assert_eq!(
524            ParseUrl::parse("notaurl?key=value", "PROTOCOL", None)?,
525            None
526        );
527
528        // Schemeless URL with fragment
529        assert_eq!(
530            ParseUrl::parse("notaurl#reference", "REF", None)?,
531            Some("reference".to_string())
532        );
533        assert_eq!(
534            ParseUrl::parse("notaurl#reference", "PATH", None)?,
535            Some("notaurl".to_string())
536        );
537        assert_eq!(
538            ParseUrl::parse("notaurl#reference", "FILE", None)?,
539            Some("notaurl".to_string())
540        );
541
542        // Schemeless URL with both query and fragment
543        assert_eq!(
544            ParseUrl::parse("notaurl?a=1&b=2#frag", "PATH", None)?,
545            Some("notaurl".to_string())
546        );
547        assert_eq!(
548            ParseUrl::parse("notaurl?a=1&b=2#frag", "QUERY", None)?,
549            Some("a=1&b=2".to_string())
550        );
551        assert_eq!(
552            ParseUrl::parse("notaurl?a=1&b=2#frag", "QUERY", Some("b"))?,
553            Some("2".to_string())
554        );
555        assert_eq!(
556            ParseUrl::parse("notaurl?a=1&b=2#frag", "REF", None)?,
557            Some("frag".to_string())
558        );
559        assert_eq!(
560            ParseUrl::parse("notaurl?a=1&b=2#frag", "FILE", None)?,
561            Some("notaurl?a=1&b=2".to_string())
562        );
563        Ok(())
564    }
565
566    #[test]
567    fn test_spark_utf8_two_args() -> Result<()> {
568        let urls = sa(&[Some("https://example.com/a?x=1"), Some("https://ex.com/")]);
569        let parts = sa(&[Some("HOST"), Some("PATH")]);
570
571        let out = spark_handled_parse_url(&[urls, parts], |x| x)?;
572        let out_sa = out.as_any().downcast_ref::<StringArray>().unwrap();
573
574        assert_eq!(out_sa.len(), 2);
575        assert_eq!(out_sa.value(0), "example.com");
576        assert_eq!(out_sa.value(1), "/");
577        Ok(())
578    }
579
580    #[test]
581    fn test_spark_utf8_three_args_query_key() -> Result<()> {
582        let urls = sa(&[
583            Some("https://example.com/a?x=1&y=2"),
584            Some("https://ex.com/?a=1"),
585        ]);
586        let parts = sa(&[Some("QUERY"), Some("QUERY")]);
587        let keys = sa(&[Some("y"), Some("b")]);
588
589        let out = spark_handled_parse_url(&[urls, parts, keys], |x| x)?;
590        let out_sa = out.as_any().downcast_ref::<StringArray>().unwrap();
591
592        assert_eq!(out_sa.len(), 2);
593        assert_eq!(out_sa.value(0), "2");
594        assert!(out_sa.is_null(1));
595        Ok(())
596    }
597
598    #[test]
599    fn test_spark_userinfo_and_nulls() -> Result<()> {
600        let urls = sa(&[
601            Some("ftp://user:pwd@ftp.example.com:21/files"),
602            Some("https://example.com"),
603            None,
604        ]);
605        let parts = sa(&[Some("USERINFO"), Some("USERINFO"), Some("USERINFO")]);
606
607        let out = spark_handled_parse_url(&[urls, parts], |x| x)?;
608        let out_sa = out.as_any().downcast_ref::<StringArray>().unwrap();
609
610        assert_eq!(out_sa.len(), 3);
611        assert_eq!(out_sa.value(0), "user:pwd");
612        assert!(out_sa.is_null(1));
613        assert!(out_sa.is_null(2));
614        Ok(())
615    }
616
617    #[test]
618    fn test_invalid_arg_count() {
619        let urls = sa(&[Some("https://example.com")]);
620        let err = spark_handled_parse_url(from_ref(&urls), |x| x).unwrap_err();
621        assert!(format!("{err}").contains("expects 2 or 3 arguments"));
622
623        let parts = sa(&[Some("HOST")]);
624        let keys = sa(&[Some("x")]);
625        let err =
626            spark_handled_parse_url(&[urls, parts, keys, sa(&[Some("extra")])], |x| x)
627                .unwrap_err();
628        assert!(format!("{err}").contains("expects 2 or 3 arguments"));
629    }
630
631    #[test]
632    fn test_non_string_types_error() {
633        let urls = sa(&[Some("https://example.com")]);
634        let bad_part = Arc::new(Int32Array::from(vec![1])) as ArrayRef;
635
636        let err = spark_handled_parse_url(&[urls, bad_part], |x| x).unwrap_err();
637        let msg = format!("{err}");
638        assert!(msg.contains("expects STRING arguments"));
639    }
640}